Skip to content

Commit

Permalink
Sketch #79
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau-lilly committed May 18, 2023
1 parent 37cc1da commit d4b5274
Show file tree
Hide file tree
Showing 19 changed files with 150 additions and 210 deletions.
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export(crew_class_router)
export(crew_controller)
export(crew_controller_group)
export(crew_controller_local)
export(crew_deprecate)
export(crew_eval)
export(crew_launcher)
export(crew_launcher_local)
Expand Down
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
* Implement a `saturated()` controller method to support `targets`.
* Add a `worker_index` column to controller summaries.
* When relaunching workers, prioritize those with unresolved tasks stuck at the NNG level (#75, @shikokuchuo).
* Always relaunch backlogged inactive workers (#79).
* Deprecate the `auto_scale` argument/field of controllers in favor of the `scale` argument of `push()`, `pop()`, and `wait()`.
* Throttle auto-scaling with interval `self$router$seconds_interval` (#76).

# crew 0.1.1
Expand Down
40 changes: 40 additions & 0 deletions R/crew_assert.R
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,39 @@ crew_assert <- function(
invisible()
}

#' @title Deprecate a `crew` feature.
#' @export
#' @family utilities
#' @description Show an informative warning when a `crew` feature is
#' deprecated.
#' @return `NULL` (invisibly). Throws a warning if a feature is deprecated.
#' @param name Name of the feature (function or argument) to deprecate.
#' @param date Date of deprecation.
#' @param version Package version when deprecation was instated.
#' @param alternative Message about an alternative.
#' @examples
#' suppressWarnings(
#' crew_deprecate(
#' name = "auto_scale",
#' date = "2023-05-18",
#' version = "0.2.0",
#' alternative = "use the scale argument of push(), pop(), and wait()."
#' )
#' )
crew_deprecate <- function(name, date, version, alternative) {
message <- sprintf(
"%s was deprecated on %s (crew version %s). Alternative: %s.",
name,
date,
version,
alternative
)
crew_warn(
message = message,
class = c("crew_deprecate", "crew_warning", "crew")
)
}

crew_error <- function(message = NULL) {
crew_stop(
message = message,
Expand All @@ -60,6 +93,13 @@ crew_stop <- function(message, class) {
rlang::abort(message = message, class = class, call = emptyenv())
}

crew_warn <- function(message, class) {
old <- getOption("rlang_backtrace_on_error")
on.exit(options(rlang_backtrace_on_error = old))
options(rlang_backtrace_on_error = "none")
rlang::warn(message = message, class = class)
}

crew_message <- function(message) {
old <- getOption("rlang_backtrace_on_error")
on.exit(options(rlang_backtrace_on_error = old))
Expand Down
121 changes: 45 additions & 76 deletions R/crew_controller.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,8 @@
#' @param router An `R6` router object created by [crew_router()].
#' @param launcher An `R6` launcher object created by one of the
#' `crew_launcher_*()` functions such as [crew_launcher_local()].
#' @param auto_scale Character of length 1, name of the method for
#' automatically scaling workers to meet demand. `NULL` to default to
#' `"demand"`. Possible values include the following:
#' * `"demand"`: just after pushing a new task in `push()`, launch
#' `min(n, max(0, t - w))` workers, where `n` is the maximum number of
#' workers, `t` is the number of queued tasks, and `w` is the current
#' number of workers already running. In other words, scale up the
#' number of workers to meet the current demand.
#' If you trust tasks not to crash workers, this is a good choice.
#' But if you think a task may always crash a worker
#' (e.g. segmentation fault or maxed out memory) then
#' this could be somewhat risky because `mirai` resubmits
#' failed tasks behind the scenes and `crew` responds by
#' re-launching workers. If you are worried about this scenario,
#' choose `auto_scale = "one"` instead, which will only launch
#' up to one worker whenever a task is pushed.
#' * `"one"`: just after pushing a new task in `push()`, launch
#' a one worker if demand `min(n, max(0, t - w))` is greater than 0.
#' * `"none"`: do not auto-scale at all.
#' @param auto_scale Deprecated. Use the `scale` argument of `push()`,
#' `pop()`, and `wait()` instead.
#' @examples
#' if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
#' crew_session_start()
Expand All @@ -41,13 +24,19 @@
crew_controller <- function(
router,
launcher,
auto_scale = "demand"
auto_scale = NULL
) {
auto_scale <- auto_scale %|||% c("demand", "one", "none")
if (!is.null(auto_scale)) {
crew_deprecate(
name = "auto_scale",
date = "2023-05-18",
version = "0.2.0",
alternative = "use the scale argument of push(), pop(), and wait()"
)
}
controller <- crew_class_controller$new(
router = router,
launcher = launcher,
auto_scale = auto_scale
launcher = launcher
)
controller$validate()
controller
Expand Down Expand Up @@ -78,10 +67,18 @@ crew_class_controller <- R6::R6Class(
inactive = function() {
daemons <- self$router$daemons
launching <- self$launcher$launching()
index <- which(is_inactive(daemons = daemons, launching = launching))
backlog <- self$router$assigned[index] - self$router$complete[index]
priority <- backlog > 0L
c(index[priority], index[!priority])
which(is_inactive(daemons = daemons, launching = launching))
},
scalable = function() {
daemons <- self$router$daemons
launching <- self$launcher$launching()
inactive <- is_inactive(daemons = daemons, launching = launching)
backlogged <- self$router$assigned > self$router$complete
list(
backlogged = which(inactive & backlogged),
resolved = which(inactive & (!backlogged)),
n_inactive = sum(inactive)
)
},
lost = function() {
daemons <- self$router$daemons
Expand Down Expand Up @@ -114,8 +111,6 @@ crew_class_controller <- R6::R6Class(
router = NULL,
#' @field launcher Launcher object.
launcher = NULL,
#' @field auto_scale Scaling method. See [crew_controller()].
auto_scale = NULL,
#' @field queue List of tasks in the queue.
queue = list(),
#' @field results List of finished tasks
Expand All @@ -133,7 +128,6 @@ crew_class_controller <- R6::R6Class(
#' @return An `R6` object with the controller object.
#' @param router Router object. See [crew_controller()].
#' @param launcher Launcher object. See [crew_controller()].
#' @param auto_scale Scaling method. See [crew_controller()].
#' @examples
#' if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
#' crew_session_start()
Expand All @@ -149,12 +143,10 @@ crew_class_controller <- R6::R6Class(
#' }
initialize = function(
router = NULL,
launcher = NULL,
auto_scale = NULL
launcher = NULL
) {
self$router <- router
self$launcher <- launcher
self$auto_scale <- auto_scale
invisible()
},
#' @description Validate the router.
Expand Down Expand Up @@ -244,11 +236,10 @@ crew_class_controller <- R6::R6Class(
invisible()
},
#' @description Run auto-scaling.
#' @details This method is called during `push()`, and the method for
#' scaling up workers is governed by the `auto_scale`
#' argument of [crew_controller()]. It is not meant to be called
#' manually. If called manually, it is recommended to call `collect()`
#' first so `scale()` can accurately assess the demand.
#' @details This method is called during `push()`, `pop()`, and `wait()`
#' if the `scale` argument is `TRUE`.
#' If called manually, it is recommended to call `collect()`
#' first so `scale()` can accurately assess the task load.
#' For finer control of the number of workers launched,
#' call `launch()` on the controller with the exact desired
#' number of workers.
Expand All @@ -270,18 +261,14 @@ crew_class_controller <- R6::R6Class(
nanonext::msleep(10)
self$router$poll()
self$router$tally()
inactive <- private$inactive()
scalable <- private$scalable()
private$try_launch(inactive = scalable$backlogged, n = Inf)
available <- self$router$workers - length(scalable$resolved)
self$collect()
demand <- controller_demand(
tasks = length(self$queue),
workers = nrow(self$router$daemons) - length(inactive)
)
n <- controller_n_new_workers(
demand = demand,
auto_scale = self$auto_scale,
max = self$router$workers
)
private$try_launch(inactive = inactive, n = n)
deficit <- max(length(self$queue) - available, self$router$workers)
if (deficit > 0L) {
private$try_launch(inactive = scalable$resolved, n = deficit)
}
invisible()
},
#' @description Push a task to the head of the task list.
Expand Down Expand Up @@ -311,13 +298,10 @@ crew_class_controller <- R6::R6Class(
#' @param seconds_timeout Optional task timeout passed to the `.timeout`
#' argument of `mirai::mirai()` (after converting to milliseconds).
#' @param scale Logical, whether to automatically scale workers to meet
#' demand, depending on the `throttle` argument.
#' demand.
#' If `TRUE`, then `collect()` runs first
#' so demand can be properly assessed before scaling and the number
#' of workers is not too high.
#' @param throttle Whether to skip auto-scaling if auto-scaling
#' has already occurred recently
#' (within the last `self$router$seconds_interval` seconds).
#' @param name Optional name of the task. Replaced with a random name
#' if `NULL` or in conflict with an existing name in the task list.
#' @param controller Not used. Included to ensure the signature is
Expand All @@ -332,7 +316,6 @@ crew_class_controller <- R6::R6Class(
library = NULL,
seconds_timeout = NULL,
scale = TRUE,
throttle = TRUE,
name = NULL,
controller = NULL
) {
Expand Down Expand Up @@ -377,7 +360,7 @@ crew_class_controller <- R6::R6Class(
)
self$queue[[length(self$queue) + 1L]] <- task
if (scale) {
self$scale(throttle = throttle)
self$scale(throttle = TRUE)
}
invisible()
},
Expand Down Expand Up @@ -408,19 +391,16 @@ crew_class_controller <- R6::R6Class(
#' Otherwise, if there are no results available to collect,
#' the return value is `NULL`.
#' @param scale Logical, whether to automatically scale workers to meet
#' demand, depending on the `throttle` argument.
#' demand.
#' If `TRUE`, then `collect()` runs first
#' so demand can be properly assessed before scaling and the number
#' of workers is not too high. Scaling up on `pop()` may be important
#' for transient or nearly transient workers that tend to drop off
#' quickly after doing little work.
#' @param throttle Whether to skip auto-scaling if auto-scaling
#' has already occurred recently
#' (within the last `self$router$seconds_interval` seconds).
#' @param controllers Not used. Included to ensure the signature is
#' compatible with the analogous method of controller groups.
pop = function(scale = TRUE, throttle = TRUE, controllers = NULL) {
if_any(scale, self$scale(throttle = throttle), self$collect())
pop = function(scale = TRUE, controllers = NULL) {
if_any(scale, self$scale(throttle = TRUE), self$collect())
out <- NULL
if (length(self$results) > 0L) {
task <- self$results[[1L]]
Expand Down Expand Up @@ -469,20 +449,23 @@ crew_class_controller <- R6::R6Class(
#' @param seconds_interval Number of seconds to wait between polling
#' intervals waiting for tasks.
#' @param seconds_timeout Timeout length in seconds waiting for tasks.
#' @param scale Logical of length 1, whether to auto-scale workers
#' to meet the demand of the task load.
#' @param controllers Not used. Included to ensure the signature is
#' compatible with the analogous method of controller groups.
wait = function(
mode = "all",
seconds_interval = 0.01,
seconds_timeout = Inf,
scale = TRUE,
controllers = NULL
) {
mode <- as.character(mode)
crew_assert(mode, identical(., "all") || identical(., "one"))
tryCatch(
crew_retry(
fun = ~{
self$scale(throttle = TRUE)
if_any(scale, self$scale(throttle = TRUE), self$collect())
empty_queue <- length(self$queue) < 1L
empty_results <- length(self$results) < 1L
(empty_queue && empty_results) || if_any(
Expand Down Expand Up @@ -606,20 +589,6 @@ crew_class_controller <- R6::R6Class(
)
)

controller_demand <- function(tasks, workers) {
max(0L, tasks - workers)
}

controller_n_new_workers <- function(demand, auto_scale, max) {
out <- switch(
auto_scale,
demand = demand,
one = min(1L, demand),
none = 0L
) %|||% 0L
min(out, max)
}

is_inactive <- function(daemons, launching) {
connected <- as.logical(daemons[, "online"] > 0L)
discovered <- as.logical(daemons[, "instance"] > 0L)
Expand Down
13 changes: 4 additions & 9 deletions R/crew_controller_local.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ crew_controller_local <- function(
workers = 1L,
host = NULL,
port = NULL,
seconds_launch = 30,
seconds_interval = 0.5,
seconds_interval = 0.25,
seconds_timeout = 10,
seconds_launch = 30,
seconds_idle = Inf,
seconds_wall = Inf,
seconds_exit = 1,
Expand All @@ -31,8 +31,7 @@ crew_controller_local <- function(
reset_globals = TRUE,
reset_packages = FALSE,
reset_options = FALSE,
garbage_collection = FALSE,
auto_scale = "demand"
garbage_collection = FALSE
) {
router <- crew_router(
name = name,
Expand All @@ -55,11 +54,7 @@ crew_controller_local <- function(
reset_options = reset_options,
garbage_collection = garbage_collection
)
controller <- crew_controller(
router = router,
launcher = launcher,
auto_scale = auto_scale
)
controller <- crew_controller(router = router, launcher = launcher)
controller$validate()
controller
}
2 changes: 1 addition & 1 deletion R/crew_router.R
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ crew_router <- function(
workers = 1L,
host = NULL,
port = NULL,
seconds_interval = 0.5,
seconds_interval = 0.25,
seconds_timeout = 10
) {
name <- as.character(name %|||% crew_random_name())
Expand Down
1 change: 1 addition & 0 deletions man/crew_assert.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit d4b5274

Please sign in to comment.