Skip to content

Commit

Permalink
Improve object cleanup (#145)
Browse files Browse the repository at this point in the history
* Switch from `stop()` to `abort()` and pass along `error_call` to ensure
  errors point to user facing functions.
  
* Switch from `warning()` to `pool_warn()`, which users `file(cat = stderr)`
  because condition handling isn't avaiable in `reg.finalizer()` or
  `later::later()`.
  
* Where appropriate, replace `tryCatch()` with `withCallingHandlers()` to 
  get better tracebacks.
  • Loading branch information
hadley authored Jan 28, 2023
1 parent cfe18af commit d777a58
Show file tree
Hide file tree
Showing 14 changed files with 311 additions and 182 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Imports:
DBI,
later (>= 1.0.0),
R6,
rlang
rlang (>= 1.0.0)
Suggests:
covr,
dbplyr,
Expand Down
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# pool (development version)

* Pool errors and warnings have been reviewed with an eye to making them
more immediately actionable (#145).

* Added support for SAP HANA databases (@marcosci, #103).

# pool 0.1.6
Expand Down
2 changes: 1 addition & 1 deletion R/DBI.R
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ dbPool <- function(drv,
# Force dots
dots <- list(...)
if (length(dots) > 0 && !is_named(dots)) {
stop("All arguments to `dbPool` must be named")
abort("All arguments to `dbPool` must be named")
}

state <- new.env(parent = emptyenv())
Expand Down
3 changes: 2 additions & 1 deletion R/pool-methods.R
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ poolCreate <- function(factory,
maxSize,
idleTimeout,
validationInterval,
state
state,
error_call = current_env()
)
}

Expand Down
159 changes: 96 additions & 63 deletions R/pool.R
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ Pool <- R6::R6Class("Pool",

## initialize the pool with min number of objects
initialize = function(factory, minSize, maxSize,
idleTimeout, validationInterval, state) {
idleTimeout, validationInterval, state, error_call = caller_env()) {
self$valid <- TRUE

self$counters <- new.env(parent = emptyenv())
self$counters$free <- 0
self$counters$taken <- 0
private$idCounter <- 1

if (!is.function(factory)) {
abort("`factory` must be a function.", call = error_call)
}
private$factory <- factory
self$minSize <- minSize
self$maxSize <- maxSize
Expand All @@ -31,14 +34,17 @@ Pool <- R6::R6Class("Pool",
private$freeObjects <- new.env(parent = emptyenv())

for (i in seq_len(self$minSize)) {
private$createObject()
private$createObject(error_call = error_call)
}
},

## calls activate and returns an object
fetch = function() {
fetch = function(error_call = caller_env()) {
if (!self$valid) {
stop("This pool is no longer valid. Cannot fetch new objects.")
abort(
"This pool is no longer valid. Cannot fetch new objects.",
call = error_call
)
}

## see if there's any free objects
Expand All @@ -52,12 +58,12 @@ Pool <- R6::R6Class("Pool",
} else {
## if we get here, there are no free objects
## and we must create a new one
object <- private$createObject()
object <- private$createObject(error_call = error_call)
}

private$cancelScheduledTask(object, "validateHandle")
## call onActivate, onValidate and change object status
object <- private$checkValid(object)
object <- private$checkValid(object, error_call = error_call)
private$changeObjectStatus(object, "taken")

return(object)
Expand All @@ -66,13 +72,13 @@ Pool <- R6::R6Class("Pool",
## passivates the object and returns it back to the pool
## (sets up task to destroy the object if the number of
## total objects exceeds the minimum)
release = function(object) {
release = function(object, error_call = caller_env()) {
pool_metadata <- attr(object, "pool_metadata", exact = TRUE)
if (pool_metadata$state == "free") {
stop("This object was already returned to the pool.")
abort("This object was already returned to the pool.", call = error_call)
}
if (is.null(pool_metadata) || !pool_metadata$valid) {
stop("Invalid object.")
abort("Invalid object.", call = error_call)
}
## immediately destroy object if pool has already been closed
if (!self$valid) {
Expand All @@ -85,14 +91,19 @@ Pool <- R6::R6Class("Pool",
onPassivate(object)
}, error = function(e) {
private$changeObjectStatus(object, NULL)
stop("Object could not be returned back to the pool. ",
"It was destroyed instead. Error message: ",
conditionMessage(e))
abort(
c(
"Object could not be returned back to the pool.",
"It was destroyed instead"
),
call = error_call,
parent = e
)
})

## set up a task to destroy the object after `idleTimeout`
## secs, if we're over the minimum number of objects
taskHandle <- scheduleTask(
taskHandle <- later::later(
function() {
if (self$counters$free + self$counters$taken > self$minSize) {
private$changeObjectStatus(object, NULL)
Expand All @@ -119,7 +130,7 @@ Pool <- R6::R6Class("Pool",
## immediately destroy them). Objects can no longer be
## checked out from the pool.
close = function() {
if (!self$valid) stop("The pool was already closed.")
if (!self$valid) abort("The pool was already closed.")

self$valid <- FALSE
freeEnv <- private$freeObjects
Expand All @@ -132,11 +143,10 @@ Pool <- R6::R6Class("Pool",

# check if there are taken objects
if (self$counters$taken > 0) {
warning("You still have checked out objects. Return ",
"them to the pool so they can be destroyed. ",
"(If these are leaked objects - no reference ",
"- they will be destroyed the next time the ",
"garbage collector runs).", call. = FALSE)
pool_warn(c(
"You still have checked out objects.",
"Use `poolReturn()` them to the pool so they can be destroyed."
))
}
}
),
Expand All @@ -149,16 +159,20 @@ Pool <- R6::R6Class("Pool",

## creates an object, assigns it to the
## free environment and returns it
createObject = function() {
createObject = function(error_call = parent.frame()) {
if (self$counters$free + self$counters$taken >= self$maxSize) {
stop("Maximum number of objects in pool has been reached")
abort("Maximum number of objects in pool has been reached", call = error_call)
}

object <- private$factory()
if (is.null(object)) {
stop("Object creation was not successful. The `factory` ",
"argument must be a function that creates and ",
"returns the object to be pooled.")
abort(
c(
"Object creation failed.",
"The `factory` must not return `NULL`"
),
call = error_call
)
}

## attach metadata about the object
Expand All @@ -176,7 +190,10 @@ Pool <- R6::R6Class("Pool",
## detect leaked connections and destroy them
reg.finalizer(pool_metadata, function(e) {
if (pool_metadata$valid) {
warning("You have a leaked pooled object.")
pool_warn(c(
"Checked-out object deleted before being returned.",
"Make sure to `poolReturn()` all objects retrieved with `poolCheckout().`"
))
}
}, onexit = TRUE)

Expand All @@ -189,19 +206,19 @@ Pool <- R6::R6Class("Pool",
tryCatch({
pool_metadata <- attr(object, "pool_metadata", exact = TRUE)
if (!pool_metadata$valid) {
warning("Object was destroyed twice.")
pool_warn("Object was destroyed twice.")
return()
}
pool_metadata$valid <- FALSE
private$cancelScheduledTask(object, "validateHandle")
private$cancelScheduledTask(object, "destroyHandle")
onDestroy(object)
}, error = function(e) {
warning("Object of class ", is(object)[1],
" could not be destroyed properly, ",
"but was successfully removed from pool. ",
"Error message: ", conditionMessage(e))

pool_warn(c(
"Object could not be destroyed, but was removed from the pool.",
"Error message:",
prefix(conditionMessage(e), " ")
))
})
},

Expand All @@ -224,7 +241,7 @@ Pool <- R6::R6Class("Pool",
if (exists(id, envir = removeFrom)) {
rm(list = id, envir = removeFrom)
} else {
stop("The object could not be found.")
abort("Object could not be found.")
}
}
self$counters[[from]] <- self$counters[[from]] - 1
Expand Down Expand Up @@ -256,40 +273,42 @@ Pool <- R6::R6Class("Pool",
}
},

## try to validate + activate an object; if that fails,
## destroy the object and run whatever more cleanup is
## necessary (provided through `errorFun`)
checkValidTemplate = function(object, errorFun) {
tryCatch({
onActivate(object)
private$validate(object)
return(object)
## tries to validate + activate the object; if that fails,
## warn, destroy that object and try once more
## if second attempt fails, throw an error
checkValid = function(object, error_call = caller_env()) {
tryCatch(
{
private$activateAndValidate(object)
return(object)
},
error = function(e) {}
)

}, error = function(e) {
private$changeObjectStatus(object, NULL)
errorFun(e)
})
pool_warn(c(
"Failed to activate and/or validate existing object.",
"Trying again with a new object."
))
private$changeObjectStatus(object, NULL)
object <- private$createObject()

withCallingHandlers(
private$activateAndValidate(object),
error = function(e) {
private$changeObjectStatus(object, NULL)
abort(
"Freshly created object does not appear to be valid.",
call = error_call,
parent = e
)
}
)
object
},

## tries to validate + activate the object; if that fails,
## the first time around, warn, destroy that object and try
## again with a new object; **returns** the object
## if both tries fail, throw an error
checkValid = function(object) {
object <- private$checkValidTemplate(object,
function(e) {
warning("It wasn't possible to activate and/or validate ",
"the object. Trying again with a new object.",
call. = FALSE)

private$checkValidTemplate(private$createObject(),
function(e) {
stop("Object does not appear to be valid. ",
"Error message: ", conditionMessage(e),
call. = FALSE)
})
})
return(object)
activateAndValidate = function(object) {
onActivate(object)
private$validate(object)
},

## run onValidate on the object only if over `validationInterval`
Expand All @@ -312,3 +331,17 @@ Pool <- R6::R6Class("Pool",
}
)
)


pool_warn <- function(messages) {
file <- if (is_testing()) stdout() else stderr()

out <- paste0(messages, "\n", collapse = "")
cat(prefix(out, "<pool> "), file = file)
}
prefix <- function(x, prefix) {
gsub("(?m)^", prefix, x, perl = TRUE)
}
is_testing <- function() {
identical(Sys.getenv("TESTTHAT"), "true")
}
27 changes: 2 additions & 25 deletions R/scheduler.R
Original file line number Diff line number Diff line change
@@ -1,26 +1,3 @@
## Used in the Pool class to schedule and cancel tasks (based on `later`)
scheduleTask <- function(func, delay) {
force(func)
cancel <- later::later(function() {
# Make sure warn is at least 1 so that warnings are emitted immediately.
# (warn=2 is also OK, for use in debugging.)
warn_level <- getOption("warn")
if (is.numeric(warn_level) && !is.na(warn_level) && warn_level < 1) {
op <- options(warn = 1)
on.exit(options(op), add = TRUE)
}
if (!is.null(func))
func()
}, delay)

## return value is a function that cancel the task, so the user can
## cancel the task by calling the return value of `scheduleTask`. E.g:
## > cancel <- scheduleTaskRecurring(function() print("hello"), 1)
## [1] "hello"
## [1] "hello"
cancel
}

## Used in the Pool class. This function builds on top of `scheduleTask`
## to schedule recurring tasks. It uses the same mechanics: the return
## value is a function that cancels the scheduling of future tasks.
Expand All @@ -30,9 +7,9 @@ scheduleTaskRecurring <- function(func, delay) {
func2 <- function() {
func()
if (!cancelled)
handle <<- scheduleTask(func2, delay)
handle <<- later::later(func2, delay)
}
handle <- scheduleTask(func2, delay)
handle <- later::later(func2, delay)

function() {
cancelled <<- TRUE
Expand Down
24 changes: 24 additions & 0 deletions tests/testthat/_snaps/create-destroy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# createObject throws if `factory` throws or returns NULL

Code
poolCreate(MockPooledObj)
Condition
Error in `poolCreate()`:
! `factory` must be a function.
Code
poolCreate(function(x) NULL)
Condition
Error in `poolCreate()`:
! Object creation failed.
* The `factory` must not return `NULL`

# useful warning if onDestroy fails

Code
poolReturn(b)
later::run_now()
Output
<pool> Object could not be destroyed, but was removed from the pool.
<pool> Error message:
<pool> Destruction failed...

Loading

0 comments on commit d777a58

Please sign in to comment.