Skip to content

Commit f9f87eb

Browse files
committed
rework cleanup (mschubert#307, mschubert#323)
1 parent 48a2a23 commit f9f87eb

10 files changed

+33
-28
lines changed

R/pool.r

+4-4
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,10 @@ Pool = R6::R6Class("Pool",
101101
private$master$recv(timeout)
102102
},
103103

104-
cleanup = function(timeout=5000) {
104+
cleanup = function(timeout=5) {
105105
info = self$info()
106-
private$master$close(timeout)
107-
# ^^ replace with: (1) try close connections, and (2) close socket
106+
success = private$master$close(as.integer(timeout*1000))
107+
success = self$workers$cleanup(success, timeout) # timeout left?
108108

109109
max_mem = max(c(info$mem.max+2e8, 0), na.rm=TRUE) # add 200 Mb
110110
max_mem_str = format(structure(max_mem, class="object_size"), units="auto")
@@ -123,7 +123,7 @@ Pool = R6::R6Class("Pool",
123123
message(sprintf(fmt, rt3_str, 100*(rt[[1]]+rt[[2]])/rt[[3]],
124124
100*(wt[[1]]+wt[[2]])/wt[[3]], max_mem_str))
125125

126-
invisible(TRUE)
126+
invisible(success)
127127
},
128128

129129
workers = NULL

R/qsys.r

+3
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ QSys = R6::R6Class("QSys",
3333
private$defaults = getOption("clustermq.defaults", list())
3434
},
3535

36+
cleanup = function(success, timeout) TRUE,
37+
3638
n = function() private$workers_total
3739
),
3840

@@ -44,6 +46,7 @@ QSys = R6::R6Class("QSys",
4446
template_file = NULL,
4547
workers_total = NULL,
4648
defaults = list(),
49+
is_cleaned_up = NULL,
4750

4851
fill_options = function(...) {
4952
values = utils::modifyList(private$defaults, list(...))

R/qsys_local.r

+1-4
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,7 @@ LOCAL = R6::R6Class("LOCAL",
1212
super$initialize(addr=addr, master=master)
1313
if (verbose)
1414
message("Running sequentially ('LOCAL') ...")
15-
},
16-
17-
cleanup = function(quiet=FALSE, timeout=3) {
18-
invisible(TRUE)
15+
private$is_cleaned_up = TRUE
1916
}
2017
)
2118
)

R/qsys_lsf.r

+3-3
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@ LSF = R6::R6Class("LSF",
2929
private$is_cleaned_up = FALSE
3030
},
3131

32-
cleanup = function() {
33-
private$is_cleaned_up = TRUE
32+
cleanup = function(success, timeout) {
33+
private$is_cleaned_up = success
34+
private$finalize()
3435
}
3536
),
3637

3738
private = list(
3839
job_id = NULL,
39-
is_cleaned_up = NULL,
4040

4141
finalize = function(quiet=self$workers_running == 0) {
4242
quiet = FALSE #TODO:

R/qsys_multicore.r

+6-4
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,12 @@ MULTICORE = R6::R6Class("MULTICORE",
3131
private$children[[as.character(p$pid)]] = p
3232
}
3333
private$workers_total = n_jobs
34+
private$is_cleaned_up = FALSE
3435
},
3536

36-
cleanup = function(quiet=FALSE, timeout=3) {
37-
private$collect_children(wait=TRUE, timeout=timeout)
38-
invisible(length(private$children) == 0)
37+
cleanup = function(success, timeout) {
38+
private$collect_children(wait=TRUE, timeout=3L)
39+
private$is_cleaned_up = length(private$children) == 0
3940
}
4041
),
4142

@@ -50,7 +51,7 @@ MULTICORE = R6::R6Class("MULTICORE",
5051
children = list(),
5152

5253
finalize = function(quiet=FALSE) {
53-
if (length(private$children) > 0) {
54+
if (!private$is_cleaned_up) {
5455
private$collect_children(wait=FALSE, timeout=0)
5556
running = names(private$children)
5657
if (length(running) > 0) {
@@ -61,6 +62,7 @@ MULTICORE = R6::R6Class("MULTICORE",
6162
tools::pskill(running, tools::SIGKILL)
6263
}
6364
private$children = list()
65+
private$is_cleaned_up = TRUE
6466
}
6567
}
6668
)

R/qsys_multiprocess.r

+1-2
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ MULTIPROCESS = R6::R6Class("MULTIPROCESS",
3333
private$is_cleaned_up = FALSE
3434
},
3535

36-
cleanup = function(quiet=FALSE, timeout=3) {
36+
cleanup = function(success, timeout) {
3737
dead_workers = sapply(private$callr, function(x) ! x$is_alive())
3838
if (length(dead_workers) > 0)
3939
private$callr[dead_workers] = NULL
@@ -45,7 +45,6 @@ MULTIPROCESS = R6::R6Class("MULTIPROCESS",
4545

4646
private = list(
4747
callr = list(),
48-
is_cleaned_up = NULL,
4948

5049
finalize = function(quiet=FALSE) {
5150
if (!private$is_cleaned_up) {

R/qsys_sge.r

+3-3
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@ SGE = R6::R6Class("SGE",
3131
private$is_cleaned_up = FALSE
3232
},
3333

34-
cleanup = function() {
35-
private$is_cleaned_up = TRUE
34+
cleanup = function(success, timeout) {
35+
private$is_cleaned_up = success
36+
private$finalize()
3637
}
3738
),
3839

@@ -41,7 +42,6 @@ SGE = R6::R6Class("SGE",
4142
job_name = NULL,
4243
job_id = NULL,
4344
array_idx = "$TASK_ID",
44-
is_cleaned_up = NULL,
4545

4646
finalize = function(quiet = TRUE) { # self$workers_running == 0
4747
if (!private$is_cleaned_up) {

R/qsys_slurm.r

+3-3
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@ SLURM = R6::R6Class("SLURM",
2929
private$is_cleaned_up = FALSE
3030
},
3131

32-
cleanup = function() {
33-
private$is_cleaned_up = TRUE
32+
cleanup = function(success, timeout) {
33+
private$is_cleaned_up = success
34+
private$finalize()
3435
}
3536
),
3637

3738
private = list(
3839
job_id = NULL,
39-
is_cleaned_up = NULL,
4040

4141
finalize = function(quiet = TRUE) { # self$workers_running == 0
4242
if (!private$is_cleaned_up) {

src/CMQMaster.h

+7-3
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ class CMQMaster {
3232
Rcpp::stop("Could not bind port to any address in provided pool");
3333
}
3434

35-
void close(int timeout=1000) {
35+
bool close(int timeout=1000) {
3636
if (ctx == nullptr)
37-
return;
37+
return is_cleaned_up;
3838

3939
if (peers.find(cur) != peers.end()) {
4040
auto &w = peers[cur];
@@ -53,8 +53,10 @@ class CMQMaster {
5353
auto start = Time::now();
5454
while (time_left.count() > 0) {
5555
if (std::find_if(peers.begin(), peers.end(), [](const auto &w) {
56-
return w.second.status == wlife_t::active; }) == peers.end())
56+
return w.second.status == wlife_t::active; }) == peers.end()) {
57+
is_cleaned_up = true;
5758
break;
59+
}
5860

5961
try {
6062
int rc = zmq::poll(pitems, time_left);
@@ -89,6 +91,7 @@ class CMQMaster {
8991
ctx->close();
9092
ctx = nullptr;
9193
}
94+
return is_cleaned_up;
9295
}
9396

9497
SEXP recv(int timeout=-1) {
@@ -264,6 +267,7 @@ class CMQMaster {
264267
};
265268

266269
zmq::context_t *ctx {nullptr};
270+
bool is_cleaned_up {false};
267271
int pending_workers {0};
268272
zmq::socket_t sock;
269273
std::string cur;

tests/testthat/test-4-pool.r

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ test_that("starting and stopping multicore", {
1919
expect_equal(w$workers_total, 1)
2020
expect_error(w$send(1))
2121
expect_error(w$recv(500L))
22-
w$cleanup(5000L)
22+
w$cleanup()
2323
expect_equal(w$workers_running, 0)
2424
expect_equal(w$workers_total, 0)
2525
expect_error(w$send(2))
@@ -31,7 +31,7 @@ test_that("starting and stopping multicore", {
3131
test_that("pending workers area cleaned up properly", {
3232
skip_on_os("windows")
3333
w = workers(1, qsys_id="multicore")
34-
w$cleanup(5000L)
34+
w$cleanup()
3535
expect_equal(w$workers_running, 0)
3636
expect_equal(w$workers_total, 0)
3737
})

0 commit comments

Comments
 (0)