diff --git a/DESCRIPTION b/DESCRIPTION index 4debc905..9ed66de9 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -10,7 +10,8 @@ Authors@R: c( person("Florian", "De Boissieu", email = "", role = ("ctb"), comment = "Fixed bugs and improved catalog features"), person("Andrew", "Sánchez Meador", email = "", role = ("ctb"), comment = "Implemented wing2015() for segment_snags()"), person("Bourdon", "Jean-François", email = "", role = ("ctb"), comment = "Contributed to Roussel2020() for track_sensor()"), - person("Gatziolis", "Demetrios", email = "", role = ("ctb"), comment = "Implemented Gatziolis2019() for track_sensor()")) + person("Gatziolis", "Demetrios", email = "", role = ("ctb"), comment = "Implemented Gatziolis2019() for track_sensor()"), + person("Leon", "Steinmeier", email = "Lenostatos@gmx.de", role = ("ctb"), comment = "Contributed to parallelization management")) Description: Airborne LiDAR (Light Detection and Ranging) interface for data manipulation and visualization. Read/write 'las' and 'laz' files, computation of metrics in area based approach, point filtering, artificial point reduction, diff --git a/R/utils_threads.R b/R/utils_threads.R index 991f94a5..12a60de9 100644 --- a/R/utils_threads.R +++ b/R/utils_threads.R @@ -60,75 +60,134 @@ get_lidr_threads = function() return(LIDRTHREADS$n) } -get_future_workers = function() +#' Counts the number of cores used by simple multi-core future::plan()s +#' +#' lidR employs two different parallelization mechanisms: OpenMP for +#' multi-threaded execution of certain algorithms and the future API for +#' processing LAScatalog tiles in parallel. Since the future API can be used for +#' different kinds of parallelization (including multi-threading but also +#' parallel execution on computer clusters), it is possible for OpenMP and +#' future to have incompatible settings. Example: A user has a single machine +#' with four cores and sets both, the number of OpenMP threads and the number of +#' future processes to four. In this case OpenMP has to be disabled so that it +#' doesn't interfere with the future parallelization. +#' +#' Essentially, this function analyzes the argument to the 'workers' parameter +#' of future::plan(). When this argument is either 1) an integer, 2) a character +#' vector of strings all indicating the "localhost", or 3) a function call which +#' returns 1) or 2), then the integer or the number of localhost strings is +#' returned as the number of used cores. Currently, other cases cannot be +#' evaluated. +#' +#' @return A single integer giving the number of cores or NULL if the current +#' future::plan() could not be evaluated. The latter might be the case when a +#' future::plan() involves complex architectures such as remote computers or +#' multiple nodes on a HPC. +try_to_get_num_future_cores = function() { - # If NULL returned it means that I'm not able to know if the plan involves local workers - # or complex architecture such as remote computers or multiple node on a HPC - # nocov start - strategy <- future::plan() - n <- formals(strategy)$workers + # get the current plan + plan <- future::plan() + + # if the plan is explicitly single-threaded, return 1 + if (is(plan, "uniprocess")) { return(1L) } + # "uniprocess" covers future::sequential and future::transparent and hopefully + # also single-threaded plans provided by other packages + + # if there is no argument called "workers", return NULL + if (!"workers" %in% names(formals(plan))) { return(NULL) } + + # get the value of the "workers" argument + workers_arg <- formals(plan)$workers + + # These values might be used to reference the current machine and are compared + # to the "workers" argument in the code below + localhosts <- c("localhost", "127.0.0.1", Sys.info()[["nodename"]]) + # taken from the documentation of parallelly::makeClusterPSOCK and + # parallelly::makeNodePSOCK hoping that they will also be valid for other + # interfaces - if (is(strategy, "sequential")) + # Now check whether a number of locally used cores can be extracted from the + # "workers" argument + + # if "workers" is a positive number, return that number + if (is.numeric(workers_arg) && + length(workers_arg) == 1L && + !is.na(workers_arg) && + workers_arg >= 1) { - verbose("Parallel strategy: sequential") - return(1L) + return(as.integer(workers_arg)) } - if (is(strategy, "remote")) # e.g. plan(remote(), workers = "localhost") + # if "workers" is a character vector of "localhosts", return the vector length + if (is.character(workers_arg) && + length(workers_arg) >= 1L && + all(!is.na(workers_arg)) && + all(workers_arg %in% localhosts)) { - verbose("Parallel strategy: remote") - return(1L) + return(length(workers_arg)) } - if (is(strategy, "multicore")) # e.g. plan(multicore) + # if "workers" is a function call, check whether the return value is one of + # the above tested options + if (is.call(workers_arg)) { - verbose("Parallel strategy: multicore") - - if (is.numeric(n)) # e.g. plan(multicore, workers = 2L) - return(n) + # try to evaluate the function call + evaluated_workers_arg <- tryCatch( + eval(workers_arg), + error = function(dummy) { return(NULL) } # on error: return NULL + ) - if (is.call(n)) # e.g. plan(multisession or plan(cluster) + if (is.null(evaluated_workers_arg)) # if the "normal" evaluation didn't work { - n <- eval(n) - if (is.numeric(n)) - return(n) - else - return(NULL) + # Try to evaluate the function call in the environment of the future + # package. This deals with situations where functions cannot be evaluated + # because the future package has not been loaded (using library(future), + # library(lidR), or others). + evaluated_workers_arg <- tryCatch( + # use an arbitrary future function to get the environment of the future package + eval(workers_arg, envir = environment(future::plan)), + error = function(dummy) { return(NULL) } + ) } - } - if (is(strategy, "cluster")) # e.g. plan(multisession) or plan(cluster) or plan(cluster, workers = makeCluster(3, type='SOCK')) - { - verbose("Parallel strategy: cluster") + # if the call could not be evaluated, return NULL + if (is.null(evaluated_workers_arg)) { return(NULL) } - if (is.numeric(n)) # e.g. plan(multisession, workers = 2L) - return(n) + # If the call could be evaluated, check if the result is a number or a + # vector of localhosts. + if (is.numeric(evaluated_workers_arg) && + length(evaluated_workers_arg) == 1L && + !is.na(evaluated_workers_arg) && + evaluated_workers_arg >= 1) + { + return(as.integer(evaluated_workers_arg)) + } - if (is.call(n)) # e.g. plan(multisession) or plan(cluster) + if (is.character(evaluated_workers_arg) && + length(evaluated_workers_arg) >= 1L && + all(!is.na(evaluated_workers_arg)) && + all(evaluated_workers_arg %in% localhosts)) { - n <- eval(n) - if (is.numeric(n)) - return(n) - else - return(NULL) + return(length(evaluated_workers_arg)) } - return(NULL) - } + } # end if (is.call(workers_arg)) + # -> "workers" is neither a number, nor a character vector of localhosts, + # nor a function call returning any of these values. return(NULL) - # nocov end + # nocov end } -# Because I made some typos and I rebamed stuff and I did +# Because I made some typos and I renamed stuff and I did # not check the code yet getThread <- get_lidr_threads getThreads <- get_lidr_threads -getWorkers <- get_future_workers +getWorkers <- try_to_get_num_future_cores setThreads <- set_lidr_threads must_disable_openmp = function() @@ -146,9 +205,9 @@ must_disable_openmp = function() if (is.null(workers)) { - warning("The parallel evaluation strategy was no recognized and lidR does not know if OpenMP should be disabled. -OpenMP has been disabled by security. -Use options(lidR.check.nested.parallelism = FALSE) and set_lidr_threads() for a fine control of parallelism.", call. = FALSE) + warning("The parallel evaluation strategy was not recognized and lidR does not know if OpenMP should be disabled. + OpenMP has been disabled by security. + Use options(lidR.check.nested.parallelism = FALSE) and set_lidr_threads() for a fine control of parallelism.", call. = FALSE) return(TRUE) }