Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite of how to count threads used by current future plan. #421

Merged
merged 1 commit into from
Apr 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ Authors@R: c(
person("Florian", "De Boissieu", email = "", role = ("ctb"), comment = "Fixed bugs and improved catalog features"),
person("Andrew", "Sánchez Meador", email = "", role = ("ctb"), comment = "Implemented wing2015() for segment_snags()"),
person("Bourdon", "Jean-François", email = "", role = ("ctb"), comment = "Contributed to Roussel2020() for track_sensor()"),
person("Gatziolis", "Demetrios", email = "", role = ("ctb"), comment = "Implemented Gatziolis2019() for track_sensor()"))
person("Gatziolis", "Demetrios", email = "", role = ("ctb"), comment = "Implemented Gatziolis2019() for track_sensor()"),
person("Leon", "Steinmeier", email = "[email protected]", role = ("ctb"), comment = "Contributed to parallelization management"))
Description: Airborne LiDAR (Light Detection and Ranging) interface for data
manipulation and visualization. Read/write 'las' and 'laz' files, computation
of metrics in area based approach, point filtering, artificial point reduction,
Expand Down
145 changes: 102 additions & 43 deletions R/utils_threads.R
Original file line number Diff line number Diff line change
Expand Up @@ -60,75 +60,134 @@ get_lidr_threads = function()
return(LIDRTHREADS$n)
}

get_future_workers = function()
#' Counts the number of cores used by simple multi-core future::plan()s
#'
#' lidR employs two different parallelization mechanisms: OpenMP for
#' multi-threaded execution of certain algorithms and the future API for
#' processing LAScatalog tiles in parallel. Since the future API can be used for
#' different kinds of parallelization (including multi-threading but also
#' parallel execution on computer clusters), it is possible for OpenMP and
#' future to have incompatible settings. Example: A user has a single machine
#' with four cores and sets both, the number of OpenMP threads and the number of
#' future processes to four. In this case OpenMP has to be disabled so that it
#' doesn't interfere with the future parallelization.
#'
#' Essentially, this function analyzes the argument to the 'workers' parameter
#' of future::plan(). When this argument is either 1) an integer, 2) a character
#' vector of strings all indicating the "localhost", or 3) a function call which
#' returns 1) or 2), then the integer or the number of localhost strings is
#' returned as the number of used cores. Currently, other cases cannot be
#' evaluated.
#'
#' @return A single integer giving the number of cores or NULL if the current
#' future::plan() could not be evaluated. The latter might be the case when a
#' future::plan() involves complex architectures such as remote computers or
#' multiple nodes on a HPC.
try_to_get_num_future_cores = function()
{
# If NULL returned it means that I'm not able to know if the plan involves local workers
# or complex architecture such as remote computers or multiple node on a HPC

# nocov start

strategy <- future::plan()
n <- formals(strategy)$workers
# get the current plan
plan <- future::plan()

# if the plan is explicitly single-threaded, return 1
if (is(plan, "uniprocess")) { return(1L) }
# "uniprocess" covers future::sequential and future::transparent and hopefully
# also single-threaded plans provided by other packages

# if there is no argument called "workers", return NULL
if (!"workers" %in% names(formals(plan))) { return(NULL) }

# get the value of the "workers" argument
workers_arg <- formals(plan)$workers

# These values might be used to reference the current machine and are compared
# to the "workers" argument in the code below
localhosts <- c("localhost", "127.0.0.1", Sys.info()[["nodename"]])
# taken from the documentation of parallelly::makeClusterPSOCK and
# parallelly::makeNodePSOCK hoping that they will also be valid for other
# interfaces

if (is(strategy, "sequential"))
# Now check whether a number of locally used cores can be extracted from the
# "workers" argument

# if "workers" is a positive number, return that number
if (is.numeric(workers_arg) &&
length(workers_arg) == 1L &&
!is.na(workers_arg) &&
workers_arg >= 1)
{
verbose("Parallel strategy: sequential")
return(1L)
return(as.integer(workers_arg))
}

if (is(strategy, "remote")) # e.g. plan(remote(), workers = "localhost")
# if "workers" is a character vector of "localhosts", return the vector length
if (is.character(workers_arg) &&
length(workers_arg) >= 1L &&
all(!is.na(workers_arg)) &&
all(workers_arg %in% localhosts))
{
verbose("Parallel strategy: remote")
return(1L)
return(length(workers_arg))
}

if (is(strategy, "multicore")) # e.g. plan(multicore)
# if "workers" is a function call, check whether the return value is one of
# the above tested options
if (is.call(workers_arg))
{
verbose("Parallel strategy: multicore")

if (is.numeric(n)) # e.g. plan(multicore, workers = 2L)
return(n)
# try to evaluate the function call
evaluated_workers_arg <- tryCatch(
eval(workers_arg),
error = function(dummy) { return(NULL) } # on error: return NULL
)

if (is.call(n)) # e.g. plan(multisession or plan(cluster)
if (is.null(evaluated_workers_arg)) # if the "normal" evaluation didn't work
{
n <- eval(n)
if (is.numeric(n))
return(n)
else
return(NULL)
# Try to evaluate the function call in the environment of the future
# package. This deals with situations where functions cannot be evaluated
# because the future package has not been loaded (using library(future),
# library(lidR), or others).
evaluated_workers_arg <- tryCatch(
# use an arbitrary future function to get the environment of the future package
eval(workers_arg, envir = environment(future::plan)),
error = function(dummy) { return(NULL) }
)
}
}

if (is(strategy, "cluster")) # e.g. plan(multisession) or plan(cluster) or plan(cluster, workers = makeCluster(3, type='SOCK'))
{
verbose("Parallel strategy: cluster")
# if the call could not be evaluated, return NULL
if (is.null(evaluated_workers_arg)) { return(NULL) }

if (is.numeric(n)) # e.g. plan(multisession, workers = 2L)
return(n)
# If the call could be evaluated, check if the result is a number or a
# vector of localhosts.
if (is.numeric(evaluated_workers_arg) &&
length(evaluated_workers_arg) == 1L &&
!is.na(evaluated_workers_arg) &&
evaluated_workers_arg >= 1)
{
return(as.integer(evaluated_workers_arg))
}

if (is.call(n)) # e.g. plan(multisession) or plan(cluster)
if (is.character(evaluated_workers_arg) &&
length(evaluated_workers_arg) >= 1L &&
all(!is.na(evaluated_workers_arg)) &&
all(evaluated_workers_arg %in% localhosts))
{
n <- eval(n)
if (is.numeric(n))
return(n)
else
return(NULL)
return(length(evaluated_workers_arg))
}

return(NULL)
}
} # end if (is.call(workers_arg))

# -> "workers" is neither a number, nor a character vector of localhosts,
# nor a function call returning any of these values.
return(NULL)

# nocov end
# nocov end
}


# Because I made some typos and I rebamed stuff and I did
# Because I made some typos and I renamed stuff and I did
# not check the code yet
getThread <- get_lidr_threads
getThreads <- get_lidr_threads
getWorkers <- get_future_workers
getWorkers <- try_to_get_num_future_cores
setThreads <- set_lidr_threads

must_disable_openmp = function()
Expand All @@ -146,9 +205,9 @@ must_disable_openmp = function()

if (is.null(workers))
{
warning("The parallel evaluation strategy was no recognized and lidR does not know if OpenMP should be disabled.
OpenMP has been disabled by security.
Use options(lidR.check.nested.parallelism = FALSE) and set_lidr_threads() for a fine control of parallelism.", call. = FALSE)
warning("The parallel evaluation strategy was not recognized and lidR does not know if OpenMP should be disabled.
OpenMP has been disabled by security.
Use options(lidR.check.nested.parallelism = FALSE) and set_lidr_threads() for a fine control of parallelism.", call. = FALSE)
return(TRUE)
}

Expand Down