Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve object cleanup #145

Merged
merged 13 commits into from
Jan 28, 2023
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Imports:
DBI,
later (>= 1.0.0),
R6,
rlang
rlang (>= 1.0.0)
Suggests:
covr,
dbplyr,
Expand Down
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# pool (development version)

* Pool errors and warnings have been reviewed with an eye to making them
more immediately actionable (#145).

* Added support for SAP HANA databases (@marcosci, #103).

# pool 0.1.6
Expand Down
2 changes: 1 addition & 1 deletion R/DBI.R
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ dbPool <- function(drv,
# Force dots
dots <- list(...)
if (length(dots) > 0 && !is_named(dots)) {
stop("All arguments to `dbPool` must be named")
abort("All arguments to `dbPool` must be named")
}

state <- new.env(parent = emptyenv())
Expand Down
3 changes: 2 additions & 1 deletion R/pool-methods.R
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ poolCreate <- function(factory,
maxSize,
idleTimeout,
validationInterval,
state
state,
error_call = current_env()
)
}

Expand Down
159 changes: 96 additions & 63 deletions R/pool.R
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ Pool <- R6::R6Class("Pool",

## initialize the pool with min number of objects
initialize = function(factory, minSize, maxSize,
idleTimeout, validationInterval, state) {
idleTimeout, validationInterval, state, error_call = caller_env()) {
self$valid <- TRUE

self$counters <- new.env(parent = emptyenv())
self$counters$free <- 0
self$counters$taken <- 0
private$idCounter <- 1

if (!is.function(factory)) {
abort("`factory` must be a function.", call = error_call)
}
private$factory <- factory
self$minSize <- minSize
self$maxSize <- maxSize
Expand All @@ -31,14 +34,17 @@ Pool <- R6::R6Class("Pool",
private$freeObjects <- new.env(parent = emptyenv())

for (i in seq_len(self$minSize)) {
private$createObject()
private$createObject(error_call = error_call)
}
},

## calls activate and returns an object
fetch = function() {
fetch = function(error_call = caller_env()) {
if (!self$valid) {
stop("This pool is no longer valid. Cannot fetch new objects.")
abort(
"This pool is no longer valid. Cannot fetch new objects.",
call = error_call
)
}

## see if there's any free objects
Expand All @@ -52,12 +58,12 @@ Pool <- R6::R6Class("Pool",
} else {
## if we get here, there are no free objects
## and we must create a new one
object <- private$createObject()
object <- private$createObject(error_call = error_call)
}

private$cancelScheduledTask(object, "validateHandle")
## call onActivate, onValidate and change object status
object <- private$checkValid(object)
object <- private$checkValid(object, error_call = error_call)
private$changeObjectStatus(object, "taken")

return(object)
Expand All @@ -66,13 +72,13 @@ Pool <- R6::R6Class("Pool",
## passivates the object and returns it back to the pool
## (sets up task to destroy the object if the number of
## total objects exceeds the minimum)
release = function(object) {
release = function(object, error_call = caller_env()) {
pool_metadata <- attr(object, "pool_metadata", exact = TRUE)
if (pool_metadata$state == "free") {
stop("This object was already returned to the pool.")
abort("This object was already returned to the pool.", call = error_call)
}
if (is.null(pool_metadata) || !pool_metadata$valid) {
stop("Invalid object.")
abort("Invalid object.", call = error_call)
}
## immediately destroy object if pool has already been closed
if (!self$valid) {
Expand All @@ -85,14 +91,19 @@ Pool <- R6::R6Class("Pool",
onPassivate(object)
}, error = function(e) {
private$changeObjectStatus(object, NULL)
stop("Object could not be returned back to the pool. ",
"It was destroyed instead. Error message: ",
conditionMessage(e))
abort(
c(
"Object could not be returned back to the pool.",
"It was destroyed instead"
),
call = error_call,
parent = e
)
})

## set up a task to destroy the object after `idleTimeout`
## secs, if we're over the minimum number of objects
taskHandle <- scheduleTask(
taskHandle <- later::later(
function() {
if (self$counters$free + self$counters$taken > self$minSize) {
private$changeObjectStatus(object, NULL)
Expand All @@ -119,7 +130,7 @@ Pool <- R6::R6Class("Pool",
## immediately destroy them). Objects can no longer be
## checked out from the pool.
close = function() {
if (!self$valid) stop("The pool was already closed.")
if (!self$valid) abort("The pool was already closed.")

self$valid <- FALSE
freeEnv <- private$freeObjects
Expand All @@ -132,11 +143,10 @@ Pool <- R6::R6Class("Pool",

# check if there are taken objects
if (self$counters$taken > 0) {
warning("You still have checked out objects. Return ",
"them to the pool so they can be destroyed. ",
"(If these are leaked objects - no reference ",
"- they will be destroyed the next time the ",
"garbage collector runs).", call. = FALSE)
pool_warn(c(
"You still have checked out objects.",
"Use `poolReturn()` them to the pool so they can be destroyed."
))
}
}
),
Expand All @@ -149,16 +159,20 @@ Pool <- R6::R6Class("Pool",

## creates an object, assigns it to the
## free environment and returns it
createObject = function() {
createObject = function(error_call = parent.frame()) {
if (self$counters$free + self$counters$taken >= self$maxSize) {
stop("Maximum number of objects in pool has been reached")
abort("Maximum number of objects in pool has been reached", call = error_call)
}

object <- private$factory()
if (is.null(object)) {
stop("Object creation was not successful. The `factory` ",
"argument must be a function that creates and ",
"returns the object to be pooled.")
abort(
c(
"Object creation failed.",
"The `factory` must not return `NULL`"
),
call = error_call
)
}

## attach metadata about the object
Expand All @@ -176,7 +190,10 @@ Pool <- R6::R6Class("Pool",
## detect leaked connections and destroy them
reg.finalizer(pool_metadata, function(e) {
if (pool_metadata$valid) {
warning("You have a leaked pooled object.")
pool_warn(c(
"Checked-out object deleted before being returned.",
"Make sure to `poolReturn()` all objects retrieved with `poolCheckout().`"
))
}
}, onexit = TRUE)

Expand All @@ -189,19 +206,19 @@ Pool <- R6::R6Class("Pool",
tryCatch({
pool_metadata <- attr(object, "pool_metadata", exact = TRUE)
if (!pool_metadata$valid) {
warning("Object was destroyed twice.")
pool_warn("Object was destroyed twice.")
return()
}
pool_metadata$valid <- FALSE
private$cancelScheduledTask(object, "validateHandle")
private$cancelScheduledTask(object, "destroyHandle")
onDestroy(object)
}, error = function(e) {
warning("Object of class ", is(object)[1],
" could not be destroyed properly, ",
"but was successfully removed from pool. ",
"Error message: ", conditionMessage(e))

pool_warn(c(
"Object could not be destroyed, but was removed from the pool.",
"Error message:",
prefix(conditionMessage(e), " ")
))
})
},

Expand All @@ -224,7 +241,7 @@ Pool <- R6::R6Class("Pool",
if (exists(id, envir = removeFrom)) {
rm(list = id, envir = removeFrom)
} else {
stop("The object could not be found.")
abort("Object could not be found.")
}
}
self$counters[[from]] <- self$counters[[from]] - 1
Expand Down Expand Up @@ -256,40 +273,42 @@ Pool <- R6::R6Class("Pool",
}
},

## try to validate + activate an object; if that fails,
## destroy the object and run whatever more cleanup is
## necessary (provided through `errorFun`)
checkValidTemplate = function(object, errorFun) {
tryCatch({
onActivate(object)
private$validate(object)
return(object)
## tries to validate + activate the object; if that fails,
## warn, destroy that object and try once more
## if second attempt fails, throw an error
checkValid = function(object, error_call = caller_env()) {
tryCatch(
{
private$activateAndValidate(object)
return(object)
},
error = function(e) {}
)

}, error = function(e) {
private$changeObjectStatus(object, NULL)
errorFun(e)
})
pool_warn(c(
"Failed to activate and/or validate existing object.",
"Trying again with a new object."
))
private$changeObjectStatus(object, NULL)
object <- private$createObject()

withCallingHandlers(
private$activateAndValidate(object),
error = function(e) {
private$changeObjectStatus(object, NULL)
abort(
"Freshly created object does not appear to be valid.",
call = error_call,
parent = e
)
}
)
object
},

## tries to validate + activate the object; if that fails,
## the first time around, warn, destroy that object and try
## again with a new object; **returns** the object
## if both tries fail, throw an error
checkValid = function(object) {
object <- private$checkValidTemplate(object,
function(e) {
warning("It wasn't possible to activate and/or validate ",
"the object. Trying again with a new object.",
call. = FALSE)

private$checkValidTemplate(private$createObject(),
function(e) {
stop("Object does not appear to be valid. ",
"Error message: ", conditionMessage(e),
call. = FALSE)
})
})
return(object)
activateAndValidate = function(object) {
onActivate(object)
private$validate(object)
},

## run onValidate on the object only if over `validationInterval`
Expand All @@ -312,3 +331,17 @@ Pool <- R6::R6Class("Pool",
}
)
)


pool_warn <- function(messages) {
file <- if (is_testing()) stdout() else stderr()

out <- paste0(messages, "\n", collapse = "")
cat(prefix(out, "<pool> "), file = file)
}
prefix <- function(x, prefix) {
gsub("(?m)^", prefix, x, perl = TRUE)
}
is_testing <- function() {
identical(Sys.getenv("TESTTHAT"), "true")
}
27 changes: 2 additions & 25 deletions R/scheduler.R
Original file line number Diff line number Diff line change
@@ -1,26 +1,3 @@
## Used in the Pool class to schedule and cancel tasks (based on `later`)
scheduleTask <- function(func, delay) {
force(func)
cancel <- later::later(function() {
# Make sure warn is at least 1 so that warnings are emitted immediately.
# (warn=2 is also OK, for use in debugging.)
warn_level <- getOption("warn")
if (is.numeric(warn_level) && !is.na(warn_level) && warn_level < 1) {
op <- options(warn = 1)
on.exit(options(op), add = TRUE)
}
if (!is.null(func))
func()
}, delay)

## return value is a function that cancel the task, so the user can
## cancel the task by calling the return value of `scheduleTask`. E.g:
## > cancel <- scheduleTaskRecurring(function() print("hello"), 1)
## [1] "hello"
## [1] "hello"
cancel
}

## Used in the Pool class. This function builds on top of `scheduleTask`
## to schedule recurring tasks. It uses the same mechanics: the return
## value is a function that cancels the scheduling of future tasks.
Expand All @@ -30,9 +7,9 @@ scheduleTaskRecurring <- function(func, delay) {
func2 <- function() {
func()
if (!cancelled)
handle <<- scheduleTask(func2, delay)
handle <<- later::later(func2, delay)
}
handle <- scheduleTask(func2, delay)
handle <- later::later(func2, delay)

function() {
cancelled <<- TRUE
Expand Down
24 changes: 24 additions & 0 deletions tests/testthat/_snaps/create-destroy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# createObject throws if `factory` throws or returns NULL

Code
poolCreate(MockPooledObj)
Condition
Error in `poolCreate()`:
! `factory` must be a function.
Code
poolCreate(function(x) NULL)
Condition
Error in `poolCreate()`:
! Object creation failed.
* The `factory` must not return `NULL`

# useful warning if onDestroy fails

Code
poolReturn(b)
later::run_now()
Output
<pool> Object could not be destroyed, but was removed from the pool.
<pool> Error message:
<pool> Destruction failed...

Loading