From b23b766d4697db85731293c8cd2df37d2cf094c3 Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Wed, 17 Apr 2024 15:11:29 +0100 Subject: [PATCH 1/5] re-implements promises methods --- DESCRIPTION | 5 ++--- NAMESPACE | 1 + NEWS.md | 2 +- R/mirai-package.R | 5 +++-- R/mirai.R | 4 ++-- R/promises.R | 42 ++++++++++++++++++++++++++---------------- 6 files changed, 35 insertions(+), 24 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 7127afec6..fbdfdf9d3 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: mirai Type: Package Title: Minimalist Async Evaluation Framework for R -Version: 0.13.2.9001 +Version: 0.13.2.9002 Description: Lightweight parallel code execution and distributed computing. Designed for simplicity, a 'mirai' evaluates an R expression asynchronously, on local or network resources, resolving automatically upon completion. @@ -23,13 +23,12 @@ Encoding: UTF-8 Depends: R (>= 3.6) Imports: - nanonext (>= 0.13.3) + nanonext (>= 0.13.6) Enhances: parallel, promises Suggests: knitr, - later, markdown VignetteBuilder: knitr RoxygenNote: 7.3.1 diff --git a/NAMESPACE b/NAMESPACE index a360c9962..9e5e80dcd 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -65,6 +65,7 @@ importFrom(nanonext,reap) importFrom(nanonext,recv) importFrom(nanonext,recv_aio_signal) importFrom(nanonext,request) +importFrom(nanonext,request_promise) importFrom(nanonext,request_signal) importFrom(nanonext,send) importFrom(nanonext,socket) diff --git a/NEWS.md b/NEWS.md index 0015f3432..2c6cd22c9 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,4 +1,4 @@ -# mirai 0.13.2.9001 (development) +# mirai 0.13.2.9002 (development) * `stop_mirai()` now returns a 'miraiInterrupt' in the case the asynchronous task was still ongoing (thanks @jcheng #110). diff --git a/R/mirai-package.R b/R/mirai-package.R index 1714fd7b5..6e4804740 100644 --- a/R/mirai-package.R +++ b/R/mirai-package.R @@ -44,8 +44,9 @@ #' #' @importFrom nanonext call_aio call_aio_ .context cv cv_value dial #' is_error_value listen lock mclock msleep next_config opt opt<- parse_url -#' pipe_notify random reap recv recv_aio_signal request request_signal send -#' socket stat stop_aio strcat tls_config unresolved until wait write_cert +#' pipe_notify random reap recv recv_aio_signal request request_promise +#' request_signal send socket stat stop_aio strcat tls_config unresolved +#' until wait write_cert #' @importFrom parallel nextRNGStream #' @importFrom stats rexp #' @importFrom utils .DollarNames diff --git a/R/mirai.R b/R/mirai.R index 0be41dd37..4d783e7fa 100644 --- a/R/mirai.R +++ b/R/mirai.R @@ -145,10 +145,10 @@ mirai <- function(.expr, ..., .args = list(), .timeout = NULL, .compute = "defau envir <- ..[[.compute]] if (is.null(envir)) { sock <- ephemeral_daemon(local_url()) - aio <- request(.context(sock), data = data, send_mode = 1L, recv_mode = 1L, timeout = .timeout) + aio <- request_promise(.context(sock), data = data, cv = NULL, send_mode = 1L, recv_mode = 1L, timeout = .timeout) `attr<-`(.subset2(aio, "aio"), "sock", sock) } else { - aio <- request_signal(.context(envir[["sock"]]), data = data, cv = envir[["cv"]], send_mode = 3L, recv_mode = 1L, timeout = .timeout) + aio <- request_promise(.context(envir[["sock"]]), data = data, cv = envir[["cv"]], send_mode = 3L, recv_mode = 1L, timeout = .timeout) } `class<-`(aio, c("mirai", "recvAio")) diff --git a/R/promises.R b/R/promises.R index 28ac2b499..2cdd54e8a 100644 --- a/R/promises.R +++ b/R/promises.R @@ -64,22 +64,32 @@ #' as.promise.mirai <- function(x) { - force(x) - promises::then( - promise = promises::promise( - function(resolve, reject) { - query <- function() - if (unresolved(x)) - later::later(query, delay = 0.1) else - resolve(.subset2(x, "value")) - query() - } - ), - onFulfilled = function(value) - if (is_error_value(value) && !is_mirai_interrupt(value)) - stop(value) else - value - ) + promise <- .subset2(x, "promise") + + if (is.null(promise)) { + + promise <- promises::promise( + function(resolve, reject) + assign("callback", + function(...) + if (is_error_value(value <- .subset2(x, "data")) && !is_mirai_interrupt(value)) + reject(value) else + resolve(value), + x) + ) + + value <- .subset2(x, "data") + + if (!unresolved(value)) + promise <- if (is_error_value(value) && !is_mirai_interrupt(value)) + promises::promise_reject(value) else + promises::promise_resolve(value) + + assign("promise", promise, x) + + } + + promise } From f9cd228084d0d96e707beccada4e9781418a3605 Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Wed, 17 Apr 2024 22:04:25 +0100 Subject: [PATCH 2/5] simplify --- DESCRIPTION | 2 +- R/promises.R | 24 ++++++++++++------------ 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index fbdfdf9d3..b29450021 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -23,7 +23,7 @@ Encoding: UTF-8 Depends: R (>= 3.6) Imports: - nanonext (>= 0.13.6) + nanonext (>= 0.13.6.9000) Enhances: parallel, promises diff --git a/R/promises.R b/R/promises.R index 2cdd54e8a..8162fe138 100644 --- a/R/promises.R +++ b/R/promises.R @@ -70,20 +70,20 @@ as.promise.mirai <- function(x) { promise <- promises::promise( function(resolve, reject) - assign("callback", - function(...) - if (is_error_value(value <- .subset2(x, "data")) && !is_mirai_interrupt(value)) - reject(value) else - resolve(value), - x) + attr(x, "callback") <- function() { + value <- .subset2(x, "data") + if (is_error_value(value) && !is_mirai_interrupt(value)) + reject(value) else + resolve(value) + } ) - value <- .subset2(x, "data") - - if (!unresolved(value)) - promise <- if (is_error_value(value) && !is_mirai_interrupt(value)) - promises::promise_reject(value) else - promises::promise_resolve(value) + if (!unresolved(x)) { + value <- .subset2(x, "value") + promise <- if (is_error_value(value) && !is_mirai_interrupt(value)) + promises::promise_reject(value) else + promises::promise_resolve(value) + } assign("promise", promise, x) From c86159424386778bca15bbdd1360cb5d1e780e33 Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Wed, 17 Apr 2024 22:38:13 +0100 Subject: [PATCH 3/5] preserve deep stacks for Shiny errors --- R/promises.R | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/R/promises.R b/R/promises.R index 8162fe138..584e2a243 100644 --- a/R/promises.R +++ b/R/promises.R @@ -68,14 +68,15 @@ as.promise.mirai <- function(x) { if (is.null(promise)) { - promise <- promises::promise( - function(resolve, reject) - attr(x, "callback") <- function() { - value <- .subset2(x, "data") - if (is_error_value(value) && !is_mirai_interrupt(value)) - reject(value) else - resolve(value) - } + promise <- promises::then( + promises::promise( + function(resolve, reject) + attr(x, "callback") <- function() resolve(.subset2(x, "data")) + ), + onFulfilled = function(value) + if (is_error_value(value) && !is_mirai_interrupt(value)) + stop(value) else + value ) if (!unresolved(x)) { From a61c07005c1d5f9d9514d6033c7b6c30beb8b3d7 Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Wed, 17 Apr 2024 23:43:09 +0100 Subject: [PATCH 4/5] updates news --- DESCRIPTION | 2 +- NEWS.md | 3 ++- README.Rmd | 2 +- README.md | 12 ++++++------ 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index b29450021..1d4b6c192 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -23,7 +23,7 @@ Encoding: UTF-8 Depends: R (>= 3.6) Imports: - nanonext (>= 0.13.6.9000) + nanonext (>= 0.13.6.9001) Enhances: parallel, promises diff --git a/NEWS.md b/NEWS.md index 2c6cd22c9..a36acce43 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,6 +1,7 @@ # mirai 0.13.2.9002 (development) -* `stop_mirai()` now returns a 'miraiInterrupt' in the case the asynchronous task was still ongoing (thanks @jcheng #110). +* Re-implements the promises method with completely event-driven (non-polling) promises (possible thanks to improvements in `nanonext` implemented with the help of @jcheng5) +* `stop_mirai()` now returns a 'miraiInterrupt' in the case the asynchronous task was still ongoing (thanks @jcheng5 #110). # mirai 0.13.2 diff --git a/README.Rmd b/README.Rmd index 4f8446806..be96480a6 100644 --- a/README.Rmd +++ b/README.Rmd @@ -152,7 +152,7 @@ We would like to thank in particular: [Will Landau](https://github.com/wlandau/), for being instrumental in shaping development of the package, from initiating the original request for persistent daemons, through to orchestrating robustness testing for the high performance computing requirements of `crew` and `targets`. -[Joe Cheng](https://github.com/jcheng5/), for optimising the `promises` method to make `mirai` work seamlessly within Shiny, and guidance for implementing error stack traces. +[Joe Cheng](https://github.com/jcheng5/), for optimising the `promises` method to make `mirai` work seamlessly within Shiny, prototyping non-polling promises and guidance on implementing error stack traces. [Luke Tierney](https://github.com/ltierney/), R Core, for discussion on R's implementation of L'Ecuyer-CMRG streams, used to ensure statistical independence in parallel processing, and collaboration in 'providing an alternative communications backend for R'. diff --git a/README.md b/README.md index a62dff9c0..3f852cd4f 100644 --- a/README.md +++ b/README.md @@ -88,8 +88,8 @@ result. ``` r m$data -#> [1] 1.8451625 -0.2165991 -1.2117962 0.4428216 1.3387124 0.7469864 -#> [7] 2.2582459 -0.8252213 -4.6168235 0.5419577 +#> [1] 2.4181337 1.8674723 6.6228207 -1.0355541 0.8055614 1.2413703 +#> [7] -0.9656666 0.1509931 0.5354832 0.4135421 ``` Alternatively, explicitly call and wait for the result using @@ -97,8 +97,8 @@ Alternatively, explicitly call and wait for the result using ``` r call_mirai(m)$data -#> [1] 1.8451625 -0.2165991 -1.2117962 0.4428216 1.3387124 0.7469864 -#> [7] 2.2582459 -0.8252213 -4.6168235 0.5419577 +#> [1] 2.4181337 1.8674723 6.6228207 -1.0355541 0.8055614 1.2413703 +#> [7] -0.9656666 0.1509931 0.5354832 0.4135421 ``` ### Daemons @@ -184,8 +184,8 @@ for persistent daemons, through to orchestrating robustness testing for the high performance computing requirements of `crew` and `targets`. [Joe Cheng](https://github.com/jcheng5/), for optimising the `promises` -method to make `mirai` work seamlessly within Shiny, and guidance for -implementing error stack traces. +method to make `mirai` work seamlessly within Shiny, prototyping +non-polling promises and guidance on implementing error stack traces. [Luke Tierney](https://github.com/ltierney/), R Core, for discussion on R’s implementation of L’Ecuyer-CMRG streams, used to ensure statistical From 5e83033ff134c01a2370cdc611d778817825be31 Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Thu, 18 Apr 2024 09:03:30 +0100 Subject: [PATCH 5/5] update for request_promise -> request2 --- DESCRIPTION | 2 +- NAMESPACE | 2 +- R/mirai-package.R | 2 +- R/mirai.R | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 1d4b6c192..5d780e966 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -23,7 +23,7 @@ Encoding: UTF-8 Depends: R (>= 3.6) Imports: - nanonext (>= 0.13.6.9001) + nanonext (>= 0.13.6.9002) Enhances: parallel, promises diff --git a/NAMESPACE b/NAMESPACE index 9e5e80dcd..a233b8ede 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -65,7 +65,7 @@ importFrom(nanonext,reap) importFrom(nanonext,recv) importFrom(nanonext,recv_aio_signal) importFrom(nanonext,request) -importFrom(nanonext,request_promise) +importFrom(nanonext,request2) importFrom(nanonext,request_signal) importFrom(nanonext,send) importFrom(nanonext,socket) diff --git a/R/mirai-package.R b/R/mirai-package.R index 6e4804740..d39ad0514 100644 --- a/R/mirai-package.R +++ b/R/mirai-package.R @@ -44,7 +44,7 @@ #' #' @importFrom nanonext call_aio call_aio_ .context cv cv_value dial #' is_error_value listen lock mclock msleep next_config opt opt<- parse_url -#' pipe_notify random reap recv recv_aio_signal request request_promise +#' pipe_notify random reap recv recv_aio_signal request request2 #' request_signal send socket stat stop_aio strcat tls_config unresolved #' until wait write_cert #' @importFrom parallel nextRNGStream diff --git a/R/mirai.R b/R/mirai.R index 4d783e7fa..4c1c1201c 100644 --- a/R/mirai.R +++ b/R/mirai.R @@ -145,10 +145,10 @@ mirai <- function(.expr, ..., .args = list(), .timeout = NULL, .compute = "defau envir <- ..[[.compute]] if (is.null(envir)) { sock <- ephemeral_daemon(local_url()) - aio <- request_promise(.context(sock), data = data, cv = NULL, send_mode = 1L, recv_mode = 1L, timeout = .timeout) + aio <- request2(.context(sock), data = data, send_mode = 1L, recv_mode = 1L, timeout = .timeout) `attr<-`(.subset2(aio, "aio"), "sock", sock) } else { - aio <- request_promise(.context(envir[["sock"]]), data = data, cv = envir[["cv"]], send_mode = 3L, recv_mode = 1L, timeout = .timeout) + aio <- request2(.context(envir[["sock"]]), data = data, cv = envir[["cv"]], send_mode = 3L, recv_mode = 1L, timeout = .timeout) } `class<-`(aio, c("mirai", "recvAio"))