Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix error during parallelization with multisession plan #418

Closed
wants to merge 1 commit into from
Closed

Fix error during parallelization with multisession plan #418

wants to merge 1 commit into from

Conversation

Lenostatos
Copy link
Contributor

Dear JRR,

when I switched my lidR version from 3.1.1 to 3.1.2, some of my code which called catalog_apply threw an error that went something like "Error in availableWorkers() : could not find function "availableWorkers"".

I was using future::plan(future::multisession) on Kubuntu 18.04.

I tracked the error to the get_future_workers function. There, the current future strategy is fetched by a call to future::plan(). The returned strategy object seems to be a function and has an argument called "workers" which is extracted inside of get_future_workers. Depending on the strategy, the extracted argument can contain a call to availableWorkers() (and I think it can also be a call to availableCores()). When this call is evaluated later on in the function, it throws the above mentioned error.

I fixed the error by evaluating the call in the environment of the availableCores function which is part of the parallelly package since version 1.20.0 from 2020-10-30.

I know nearly nothing about environments! So I'm really unsure whether this is the way to go about this. The call to environment(parallelly::availableCores) returns <environment: namespace:parallelly> in the R console so it looks like it's just the environment of the parallelly package which is what we want there?

Currently, this fix works for me, but it definitely has to be tested on clusters and other environments.

Cheers,
Leon

…lable to the future package with a multisession plan.

Also update one function call to use the new parallelly package.

This probably needs further testing on clusters an the like!
@Jean-Romain
Copy link
Collaborator

Thank you for reporting. I don't like your solution because it adds a dependency to parallely which is not a direct dependency of lidR. Could please provide a reproducible example so I can check where the error comes from.

@Lenostatos
Copy link
Contributor Author

I noticed that the error does not occur when I load lidR via library(lidR). The reproducible example below is without a call to library(lidR).

The parallelly bits in my code can be swapped with future if you prefer that. I think it's meaningful to use parallelly since it's (kind of) encouraged by future and it's a direct dependency of future and thus an indirect dependency of lidR anyway. But of course I don't know which other implications this would have for lidR.

# Set multisession plan
future::plan(strategy = future::multisession)

# Read LAScatalog
test_catalog <- lidR::readLAScatalog(
  system.file("extdata", "MixedConifer.laz", package = "lidR")
)

# Use function from the examples of catalog_apply
my_tree_detection_method <- function(cluster, ws)
{
  las <- lidR::readLAS(cluster)
  if (lidR::is.empty(las)) return(NULL)
  
  ttops <- lidR::find_trees(las, lidR::lmf(ws))
  
  bbox  <- raster::extent(cluster)
  ttops <- raster::crop(ttops, bbox)
  
  return(ttops)
}

# Set some processing options.
lidR::opt_chunk_buffer(test_catalog) <- 10
lidR::opt_chunk_size(test_catalog)   <- 100            
#> Be careful, a chunk size smaller than 250 is likely to be irrelevant.
lidR::opt_chunk_alignment(test_catalog) <- c(-50, -35) 
lidR::opt_select(test_catalog)       <- "xyz"          
lidR::opt_filter(test_catalog)       <- "-keep_first"  

opt    <- list(automerge   = TRUE)

# Apply the function to the catalog
output <- lidR::catalog_apply(test_catalog,
  my_tree_detection_method,
  ws = 5,
  .options = opt
)
#> Error in availableCores(): could not find function "availableCores"

Created on 2021-03-29 by the reprex package (v1.0.0)

@Jean-Romain
Copy link
Collaborator

I noticed that the error does not occur when I load lidR via library(lidR).

Interesting, this explains why I did not catch it. And it demonstrates the issue is a bit complex.

I think it's meaningful to use parallelly since it's (kind of) encouraged by future

Could you please show me some reference where you read that. I'm not aware of such information.

it's a direct dependency of future and thus an indirect dependency of lidR anyway

sure, but my goal is to reduce the number of direct dependencies. I'd prefer if we can find a fix using future unless I see a document that explicitly states something about parallely and future good practices

@Lenostatos
Copy link
Contributor Author

The documentation of e.g. future::availableCores() says:

For backward-compatible reasons, these functions remain available as exact copies also from this package (as re-exports).

It's probably fine to use future instead of parallelly. Maybe it's even better if future::plan() can also return a strategy object which has a call to an actual future function as its worker argument.

I can just put future in the three places where I wrote parallelly and it should work either way.

@Jean-Romain
Copy link
Collaborator

Ok, I have all I need to understand the problem. It comes from the fact that future is not loaded. Mininal reproducile example:

future::plan(future::multisession)
strategy = future::plan()
n <- formals(strategy)$workers
eval(n)
#> Error in availableCores() : 
#>  impossible de trouver la fonction "availableCores"

@Jean-Romain
Copy link
Collaborator

Jean-Romain commented Mar 29, 2021

I think we could simply replace

if (is.call(n))
{
   n <- eval(n)
   if (is.numeric(n))
     return(n)
   else
     return(NULL)
 }

by

if (is.call(n)) 
{
   return(future::availableCores())
}

If I used eval() it is because I don't know if it can be something else.

@Lenostatos
Copy link
Contributor Author

Lenostatos commented Mar 30, 2021

I tried all different future plans (see reprex below) on my machine (laptop with intel core i5, 4 cores) and the workers argument either gives me NULL or availableCores() in all cases except for future::cluster which gives availableWorkers() and for future::remote which gives me the workers that I passed to it beforehand.

Maybe use future::availableCores() like you suggested for every plan except for future::cluster and future::remote? For those two maybe length(future::availableWorkers()) is more meaningful? I don't have any experience with clusters so I'm not sure if this will work as intended.

# These two functions always return the same regardless of the current plan:
future::availableCores()
#> system 
#>      4
future::availableWorkers()
#> [1] "localhost" "localhost" "localhost" "localhost"

# sequential
future::plan(future::sequential)
strategy <- future::plan()
(n <- formals(strategy)$workers)
#> NULL
eval(n)
#> NULL

# transparent
future::plan(future::transparent)
strategy <- future::plan()
(n <- formals(strategy)$workers)
#> NULL
eval(n)
#> NULL

# multisession
future::plan(future::multisession)
strategy <- future::plan()
(n <- formals(strategy)$workers)
#> availableCores()
eval(n)
#> Error in availableCores(): could not find function "availableCores"

# multicore
future::plan(future::multicore)
strategy <- future::plan()
(n <- formals(strategy)$workers)
#> availableCores(constraints = "multicore")
eval(n)
#> Error in availableCores(constraints = "multicore"): could not find function "availableCores"

# multiprocess (deprecated)
future::plan(future::multiprocess)
#> Warning: Strategy 'multiprocess' is deprecated in future (>= 1.20.0). Instead,
#> explicitly specify either 'multisession' or 'multicore'. In the current R
#> session, 'multiprocess' equals 'multisession'.
#> Warning in supportsMulticoreAndRStudio(...): [ONE-TIME WARNING] Forked
#> processing ('multicore') is not supported when running R from RStudio
#> because it is considered unstable. For more details, how to control forked
#> processing or not, and how to silence this warning in future R sessions, see ?
#> parallelly::supportsMulticore
strategy <- future::plan()
(n <- formals(strategy)$workers)
#> availableCores()
eval(n)
#> Error in availableCores(): could not find function "availableCores"

# cluster
future::plan(future::cluster)
strategy <- future::plan()
(n <- formals(strategy)$workers)
#> availableWorkers()
eval(n)
#> Error in availableWorkers(): could not find function "availableWorkers"

# remote
future::plan(future::remote)
#> Error: 'length(workers) >= 1L' is not TRUE
future::plan(future::remote(workers = c("localhost", "localhost")))
strategy <- future::plan()
(n <- formals(strategy)$workers)
#> [1] "localhost" "localhost"
eval(n)
#> [1] "localhost" "localhost"
# -> I always get the number of "localhosts" which I passed to future::remote
# I can even give more than four "localhosts"

Created on 2021-03-30 by the reprex package (v1.0.0)

@Lenostatos
Copy link
Contributor Author

Okay, I think there might be problems when custom parameters are used with the plans. See the reprex below with custom plans.

Just to make sure that I understood the lidR code correctly: The lidR function get_future_workers is supposed to return the number of to-be-used parallel processing units, right?

# multisession - custom number of workers
future::plan(future::multisession(workers = 2))
strategy <- future::plan()
(n <- formals(strategy)$workers)
#> [1] 2
eval(n)
#> [1] 2
eval(n, envir = environment(future::plan))
#> [1] 2

# cluster - workers from parallel package
future::plan(future::cluster, workers = parallel::makeCluster(3, type='SOCK'))
strategy <- future::plan()
(n <- formals(strategy)$workers)
#> socket cluster with 3 nodes on host 'localhost'
eval(n)
#> socket cluster with 3 nodes on host 'localhost'
eval(n, envir = environment(future::plan))
#> socket cluster with 3 nodes on host 'localhost'

Created on 2021-03-30 by the reprex package (v1.0.0)

@Lenostatos
Copy link
Contributor Author

Currently, for future::remote plans, lidR::get_future_workers does return(1L). But is that the intended behaviour? The reprex above showed that the future::remote plan can entail more than one parallel processing unit, if I understand it correctly.

Since the get_future_workers function uses is(strategy, "plan-name") to detect which plan was selected by the user, I had a look at the output of class(strategy) with different plans (see reprex below). To summarize a bit:

  • Only future::sequential and future::transparent have the class "uniprocess". All other plans have the class "multiprocess". Even if they were set up with just one worker.
  • The last four plans at the end of the reprex were customized and have the additional class "tweaked".
  • future::multisession, future::multicore, and future::remote are uniquely identifiable by their class name but future::multiprocess and future::cluster share their class name with other plans.
# Classes of future::plans

# sequential
future::plan(future::sequential)
class(future::plan())
#> [1] "FutureStrategy" "sequential"     "uniprocess"     "future"        
#> [5] "function"

# transparent
future::plan(future::transparent)
class(future::plan())
#> [1] "FutureStrategy" "transparent"    "sequential"     "uniprocess"    
#> [5] "future"         "function"

# multisession
future::plan(future::multisession)
class(future::plan())
#> [1] "FutureStrategy" "multisession"   "cluster"        "multiprocess"  
#> [5] "future"         "function"

# multicore
future::plan(future::multicore)
class(future::plan())
#> [1] "FutureStrategy" "multicore"      "multiprocess"   "future"        
#> [5] "function"

# multiprocess (deprecated)
suppressWarnings(future::plan(future::multiprocess))
class(future::plan())
#> [1] "FutureStrategy" "multiprocess"   "future"         "function"

# cluster
future::plan(future::cluster)
class(future::plan())
#> [1] "FutureStrategy" "cluster"        "multiprocess"   "future"        
#> [5] "function"

# remote
future::plan(future::remote(workers = c("localhost", "localhost")))
class(future::plan())
#> [1] "FutureStrategy" "tweaked"        "remote"         "cluster"       
#> [5] "multiprocess"   "future"         "function"

# remote
future::plan(future::remote(workers = c("localhost")))
class(future::plan())
#> [1] "FutureStrategy" "tweaked"        "remote"         "cluster"       
#> [5] "multiprocess"   "future"         "function"
# -> remote is still "multiprocess" even with just one worker

# multisession - custom number of workers
future::plan(future::multisession(workers = 2))
class(future::plan())
#> [1] "FutureStrategy" "tweaked"        "multisession"   "cluster"       
#> [5] "multiprocess"   "future"         "function"

# cluster - workers from parallel package
future::plan(future::cluster, workers = parallel::makeCluster(3, type='SOCK'))
class(future::plan())
#> [1] "FutureStrategy" "tweaked"        "cluster"        "multiprocess"  
#> [5] "future"         "function"

Created on 2021-03-30 by the reprex package (v1.0.0)

@Jean-Romain
Copy link
Collaborator

Jean-Romain commented Mar 30, 2021

The goal is to evaluate how many workers are used to disable OpenMP to protect against nested parallel loop. Many users may do:

set_lidr_thread(8)
plan(multissesion, workers = 8L)

This function is called only at one place in must_disable_openmp. https://github.com/Jean-Romain/lidR/blob/7e9113ec240afda835b53f30d7a2e99ed144e677/R/utils_threads.R#L134 It is used only in a context of user protection an does nothing else.

In the case of remote plan it returns 1 because each remote computer is likely to be multicore the local worker is not used multiple times like in

plan(remote, workers = c("localhost", "[email protected]", "[email protected]", "[email protected]"))

Each worker is an independent computer so there is no need to disable openmp. But in

plan(future::remote(workers = c("localhost", "localhost")))

this is not necessarily true. Yet I don't see why somebody would do that. You may find other limit cases not covered. I think the protection aims for simple plans. If you are using complex plan on complex architectures you must be able to understand the problem and deal with it manually. But the function should not fail and we must fix the bug you spotted. In previous version there was another bug for cluster plan and I improved the evaluation but I introduced this unexpected bug.

@Lenostatos
Copy link
Contributor Author

Based on what I have learned about the different future plans, the possible values of the workers argument and what the get_future_workers function is for, I came up with the following suggestion:

#' Tries to find out how many cores are used by the future::plan() set by the
#' user.
#'
#' @return A single integer giving the number of cores or NULL if this number
#'   could not be evaluated with certainty. The latter might be the case when a
#'   future::plan() involves local workers or complex architectures such as
#'   remote computers or multiple nodes on a HPC.
try_to_get_num_future_cores = function()
{
  # nocov start

  plan <- future::plan()

  if (is(plan, "uniprocess"))
    # covers future::sequential and future::transparent and hopefully also
    # single-threaded plans when they are provided by other packages
  {
    verbose("Parallelization strategy: sequential")
    return(1L)
  }
  else if (
    is(plan, "multisession") || is(plan, "multicore") ||
    class(plan) == c("FutureStrategy", "multiprocess", "future", "function")
  )
    # covers future::multisession and future::multicore and the last condition
    # covers the deprecated future::multiprocess
  {
    verbose("Parallelization strategy: multicore")

    workers_arg <- formals(plan)$workers
    # -> according to the documentation of future::multisession and
    # future::multicore the workers argument may only be either "a positive
    # numeric scalar" or a function which returns such a value.

    if (is.numeric(workers_arg))
    {
      return(workers_arg)
    }
    else if (is.call(workers_arg))
    {
      if (workers_arg == "availableCores()") # the default workers argument
      {
        return(future::availableCores())
        # simply calling "availableCores()" doesn't work when the future package
        # hasn't been loaded
      }
      else
      {
        return(eval(workers_arg))
      }
    }
  }
  else if (is(plan, "cluster"))
    # covers future::cluster and future::remote
  {
    verbose("Parallelization strategy: cluster")

    workers_arg <- formals(plan)$workers
    # -> according to the documentation of future::cluster and future::remote a
    # possible argument for the workers is a "numeric scalar". If that is the
    # case, the parallelly::makeClusterPSOCK() function creates that many
    # "localhost" workers where "localhost" is just another term for "single
    # core".

    if (is.numeric(workers_arg))
    {
      return(workers_arg)
    }
    else if (is.character(workers_arg) & unique(workers_arg) == "localhost")
      # if the workers are just one or more localhosts
    {
      return(length(workers_arg))
    }
    else if (is(plan, "remote"))
    {
      verbose("Assuming independent remote machines")
      return(1L)
    }
    else
    {
      return(NULL)
    }
  }
  else
  {
    verbose("Parallelization strategy: unrecognizable by lidR")
    return(NULL)
  }

 # nocov end
}

Created on 2021-04-01 by the reprex package (v1.0.0)

According to the documentation of future::remote:

The remote plan is a very similar to the cluster plan, but provides more convenient default argument values when connecting to remote machines.

so if I understood you correctly, the main reason for treating the remote plan differently from the cluster plan is that we assume that users with independent machines in clusters (or something similar) will commonly use the remote plan rather than the cluster plan?

@Jean-Romain
Copy link
Collaborator

That sounds good. Few notes and you can make a PR

return(eval(workers_arg))

should be something like

tryCatch({ eval(worker_arg }, error = {return(NULL)})

to prevent triggering the same error. And the plan(remote) can accept similar test than cluster to test localhost.

so if I understood you correctly, the main reason for treating the remote plan differently from the cluster plan is that we assume that users with independent machines in clusters (or something similar) will commonly use the remote plan rather than the cluster plan?

The main reason is that I don't really know what is a cluster plan vs remote and in the previous version (3.1.1) lidR failed with cluster plan. This was reported by email by somebody trying to run lidR on supercomputer. So I fixed it this way very recently. So it is prone to improvement. If you think it is not required you can change it. I only ask you to explain your change to be sure to understand your choice so I can maintain the code later.

@Lenostatos
Copy link
Contributor Author

tryCatch({ eval(worker_arg }, error = {return(NULL)})

Yeah, good point. I have included that now.

And the plan(remote) can accept similar test than cluster to test localhost.

My suggestion above already performs that localhost test also for remote plans since is(plan, "cluster") is also true for remote plans:

future::plan(future::remote(workers = "localhost"))
is(future::plan(), "cluster")
#> [1] TRUE

Created on 2021-04-02 by the reprex package (v1.0.0)

I don't really know what is a cluster plan vs remote

It seems to me like they are very, very similar. Have a look at these examples from the documentation of future::remote:

## The following setups are equivalent:
plan(remote, workers = "localhost")
plan(cluster, workers = "localhost", persistent = TRUE)
plan(cluster, workers = 1, persistent = TRUE)
plan(multisession, workers = 1, persistent = TRUE)

## The following setups are equivalent:
plan(remote, workers = "remote.server.org")
plan(cluster, workers = "remote.server.org", persistent = TRUE, homogeneous = FALSE)

## The following setups are equivalent:
cl <- makeClusterPSOCK("remote.server.org")
plan(remote, workers = cl)
plan(cluster, workers = cl, persistent = TRUE)

So if even a multisession can be equivalent to a remote plan then this all comes down to guessing how the users use these plans.

I can imagine three different decisions we could make here:

  1. Be defensive: Always turn off OpenMP when the workers are neither a number nor a vector of localhosts. This assumes that more complex settings are likely selected by advanced users who will be able to meaningfully combine OpenMP and future by themselves (using options(lidR.check.nested.parallelism = FALSE)).
  2. Trust users with remote plans: If is(plan, "remote") assume that the user does not want to use future for multicore parallelization but just for distributing R sessions over independent machines. This assumes that inexperienced users might try to "accidentally" do multicore paralleliziation with the cluster plan but most likely not with the remote plan.
  3. Trust users with remote and cluster plans: If is(plan, "cluster"), assume that the user does not want to use future for multicore parallelization and that users who want to enable multicore parallelization will never use cluster or remote plans for that.

I'm kind of in favor of option 1) since it seems to be the most consistent but again, I don't have any real experience with clusters. I'm just hoping that I'm imagining everything correctly enough based on what I've read about the topic 😅 Can you tell me more about the specific error of that other user with the cluster plan?

I've also come up with a new approach for the get_future_workers function.
Instead of having all these if (is(plan, "some future plan")) the new approach first concentrates on analyzing the workers argument regardless of what the future::plan() is (Well, nearly regardless. uniprocess plans are identified at the very beginning.). Any other plan specific stuff is moved to the end of the function.

#' Tries to find out how many cores of a single machine are used by the
#' future::plan() set by the user.
#'
#' The future API can be used to parallelize tasks on multiple cores of the same
#' machine. It can also be used to run tasks in parallel on different machines,
#' e.g. in a cluster. The different machines of the cluster might themselves
#' have access to multiple cores. Since some of the lidR functions are already
#' parallelized using OpenMP, this function tries to help with deciding whether
#' the cores used by OpenMP are also demanded by the current future::plan().
#'
#' @return A single integer giving the number of cores or NULL if this number
#'   could not be evaluated with certainty. The latter might be the case when a
#'   future::plan() involves local workers or complex architectures such as
#'   remote computers or multiple nodes on a HPC.
try_to_get_num_future_cores = function()
{
  # nocov start

  res_num_cores <- NULL

  plan <- future::plan()

  # Possible values for the localhost as understood by
  # parallelly::makeClusterPSOCK and parallelly::makeNodePSOCK
  localhosts <- c("localhost", "127.0.0.1", Sys.info()[["nodename"]])

  if (is(plan, "uniprocess"))
    # covers future::sequential and future::transparent and hopefully also
    # single-threaded plans provided by other packages
  {
    res_num_cores <- 1L
  }
  else if ("workers" %in% names(formals(plan)))
    # the plan is potentially multi-threaded and has an argument called "workers"
  {
    # get the value of the "workers" argument
    workers_arg <- formals(plan)$workers

    if (is.numeric(workers_arg)) # "workers" is just a number of cores
    {
      res_num_cores <- workers_arg
    }
    else if (is.character(workers_arg) &
             length(workers_arg) >= 1 &
             all(workers_arg %in% localhosts))
      # "workers" is a character vector containing only one or more localhosts
    {
      res_num_cores <- length(workers_arg)
    }
    else if (is.call(workers_arg)) # "workers" is a function call
    {
      if (workers_arg == "availableCores()")
      {
        res_num_cores <- future::availableCores()
        # simply calling "availableCores()" doesn't work when the future
        # package hasn't been loaded
      }
      else if (workers_arg == "availableWorkers()")
      {
        available_workers <- future::availableWorkers()

        if (is.character(available_workers) &
            length(available_workers) >= 1 &
            all(available_workers %in% localhosts))
          # availableWorkers can also return a character vector containing one
          # or more localhosts
        {
          res_num_cores <- length(available_workers)
        }
      }
      else # "workers" is any other function call
      {
        # try to evaluate the function call
        evaluated_workers_arg <- tryCatch(
          eval(workers_arg),
          error = function(dummy) { return(NULL) }
        )

        if (!is.null(evaluated_workers_arg)) # the call could be evaluated
        {
          # Check if the result is a number or a vector of localhosts. In all
          # other cases res_num_cores keeps the value NULL.
          if (is.numeric(evaluated_workers_arg))
          {
            res_num_cores <- evaluated_workers_arg
          }
          else if (is.character(evaluated_workers_arg) &
                   length(evaluated_workers_arg) >= 1 &
                   all(evaluated_workers_arg %in% localhosts))
          {
            res_num_cores <- length(evaluated_workers_arg)
          }
        }
      }
    }
    else
      # "workers" is neither a number, nor a character vector of localhosts,
      # nor a function call returning any of these values.
    {
      res_num_cores <- NULL
      # res_num_cores should still be NULL at this point but ensure it with
      # this assignment anyway
    }

    # end of checking the "workers" argument of the current future::plan()
  }

  # ensure that res_num_cores is returned as an integer if it is a number
  if (is.numeric(res_num_cores))
  {
    res_num_cores <- as.integer(res_num_cores)
  }

  if (res_num_cores == 1L)
  {
    verbose("Parallelization strategy: sequential")
  }
  else if (res_num_cores > 1L)
  {
    verbose("Parallelization strategy: multicore")
  }
  else if (is.null(res_num_cores))
  {
    # Here we could e.g. do:

    # if (is(plan, "remote"))
    # {
    #   verbose("Parallelization strategy: remote cluster nodes")
    #   warning("Assuming that cluster nodes are independent machines!")
    #   res_num_cores <- 1L
    # }
    # else
    # {
    verbose("Parallelization strategy: unrecognizable by lidR")
    # }
  }
  else
  {
    warning("The currently used number of cores seems to be invalid!")
    res_num_cores <- NULL
  }

  return(res_num_cores)
  # nocov end
}

Created on 2021-04-02 by the reprex package (v1.0.0)

@Jean-Romain
Copy link
Collaborator

Jean-Romain commented Apr 2, 2021

That sounds very good !

Can you tell me more about the specific error of that other user with the cluster plan?

In the case of MPI n <- formals(future::plan())$workers returns a list instead of a vector. But it is not easy to reproduce. It is a case that must be covered. I don't know exactly what the list contains.

cl<-getMPIcluster()
plan(cluster, workers = cl)

In the current code if falls in the return(NULL) case. I think it does too in your code. But to be honest I'm currently in paternity leave so I'm reading your code and give feed back but do not dig in depth

@Jean-Romain
Copy link
Collaborator

Can you try to exit as soon as possible to make the code more readable with fewer nested if-else. e.g.

if (is(plan, "uniprocess")) return(1L)
if (!"workers" %in% names(formals(plan)) {    
    warning("The currently used number of cores seems to be invalid!")
    return(NULL)
}

workers_arg <- formals(plan)$workers
    
if (is.numeric(workers_arg)) 
   return(workers_arg)

if (is.character(workers_arg) & length(workers_arg) >= 1 & all(workers_arg %in% localhosts)) 
   return(length(workers_arg))

if (is.call(workers_arg) && workers_arg == "availableCores()")
   return(future::availableCores())

if (is.call(workers_arg) && workers_arg == "availableWorkers()")
{
   # [...]

@Lenostatos
Copy link
Contributor Author

I tried to create MPI clusters and the result looked like this (don't run the reprex just like that (see comments)):

# Don't run!
# (You will have to stop the cluster nodes by killing them "manually" in the
# Task-Manager (or something comparable).)

## MPI clusters
# Two Nodes
mpi_cluster <- snow::makeMPIcluster(2, type = "MPI")
#> Loading required namespace: Rmpi
#>  2 slaves are spawned successfully. 0 failed.
future::plan(future::cluster(workers = mpi_cluster))
formals(future::plan())$workers
#> [[1]]
#> $rank
#> [1] 1
#>
#> $RECVTAG
#> [1] 33
#>
#> $SENDTAG
#> [1] 22
#>
#> $comm
#> [1] 1
#>
#> attr(,"class")
#> [1] "MPInode"
#>
#> [[2]]
#> $rank
#> [1] 2
#>
#> $RECVTAG
#> [1] 33
#>
#> $SENDTAG
#> [1] 22
#>
#> $comm
#> [1] 1
#>
#> attr(,"class")
#> [1] "MPInode"
#>
#> attr(,"class")
#> [1] "spawnedMPIcluster" "MPIcluster"        "cluster"

# The following apparently only works with OpenMPI 1.6.5 installed
# (see https://stackoverflow.com/questions/41007564/stopcluster-in-r-snow-freeze)
# snow::stopCluster(mpi_cluster)

# One Node (I had to restart R after the two-nodes variant in order for this one to work)
mpi_cluster <- snow::makeMPIcluster(1, type = "MPI")
#> Loading required namespace: Rmpi
#>  1 slaves are spawned successfully. 0 failed.
future::plan(future::cluster(workers = mpi_cluster))
formals(future::plan())$workers
#> [[1]]
#> $rank
#> [1] 1
#>
#> $RECVTAG
#> [1] 33
#>
#> $SENDTAG
#> [1] 22
#>
#> $comm
#> [1] 1
#>
#> attr(,"class")
#> [1] "MPInode"
#>
#> attr(,"class")
#> [1] "spawnedMPIcluster" "MPIcluster"        "cluster"

# snow::stopCluster(mpi_cluster)

Created on 2021-04-05 by the reprex package (v1.0.0)

I also found another scenario in the introduction vignette of the future.batchtools package where the workers argument is actually NULL. They use nested futures where the outer layer seems to be some kind of cluster while the inner layer utilizes multicore-processing on individual cluster nodes.

future::plan(list(future.batchtools::batchtools_torque, future::multisession))
future::plan()
#> batchtools_torque:
#> - args: function (expr, envir = parent.frame(), substitute = TRUE, globals = TRUE, label = NULL, template = NULL, resources = list(), workers = NULL, registry = list(), ...)
#> - tweaked: FALSE
#> - call: future::plan(list(future.batchtools::batchtools_torque, future::multisession))
formals(future::plan())$workers
#> NULL

Created on 2021-04-05 by the reprex package (v1.0.0)

For both, the list and the NULL case, try_to_get_num_future_cores returns NULL.

What's your opinion on the three options that I listed in my previous comment? Were my descriptions understandable?

Can you try to exit as soon as possible to make the code more readable with fewer nested if-else

I did that and I also think the code is more readable now. However, I omitted the
verbose("Parallelization strategy: <some strategy>")
lines because they would cause much clutter if one wanted to include them at every place where something is returned. They could be included in must_disable_openmp instead or we create one short additional function which prints a message based on the value returned by try_to_get_num_future_cores.

Below you can see the current version of the function. I still think that using
eval(workers_arg, envir = environment(future::<some function>)
could be meaningful. At least it would make the function shorter because we would not have to include the
if (workers_arg == "availableCores()") { return(future::availableCores()) }
blocks. Also, if it works the way that I understand it, it will potentially cover more cases than just checking the known values of the workers argument.

I have included both variants in the function below and for simple future::plan()s they behave equivalent as you can see in the tests below the function.

#' Counts the number of cores used by simple multi-core future::plan()s
#'
#' lidR employs two different parallelization mechanisms: OpenMP for
#' multi-threaded execution of certain algorithms and the future API for
#' processing LAScatalog tiles in parallel. Since the future API can be used for
#' different kinds of parallelization (including multi-threading but also
#' parallel execution on computer clusters), it is possible for OpenMP and
#' future to have incompatible settings. Example: A user has a single machine
#' with four cores and sets both, the number of OpenMP threads and the number of
#' future processes to four. In this case OpenMP has to be disabled so that it
#' doesn't interfere with the future parallelization.
#'
#' Essentially, this function analyzes the argument to the 'workers' parameter
#' of future::plan(). When this argument is either 1) an integer, 2) a character
#' vector of strings all indicating the "localhost", or 3) a function call which
#' returns 1) or 2), then the integer or the number of localhost strings is
#' returned as the number of used cores. Currently, other cases cannot be
#' evaluated.
#'
#' @return A single integer giving the number of cores or NULL if the current
#'   future::plan() could not be evaluated. The latter might be the case when a
#'   future::plan() involves complex architectures such as remote computers or
#'   multiple nodes on a HPC.
try_to_get_num_future_cores = function(test_default_arguments)
{
  # nocov start
  
  # get the current plan
  plan <- future::plan()
  
  # if the plan is explicitly single-threaded, return 1
  if (is(plan, "uniprocess")) { return(1L) }
  # "uniprocess" covers future::sequential and future::transparent and hopefully
  # also single-threaded plans provided by other packages
  
  # if there is no argument called "workers", return NULL
  if (!"workers" %in% names(formals(plan))) { return(NULL) }
  
  # get the value of the "workers" argument
  workers_arg <- formals(plan)$workers
  
  # These values might be used to reference the current machine and are compared
  # to the "workers" argument in the code below
  localhosts <- c("localhost", "127.0.0.1", Sys.info()[["nodename"]])
  # taken from the documentation of parallelly::makeClusterPSOCK and
  # parallelly::makeNodePSOCK hoping that they will also be valid for other
  # interfaces
  
  # Now check whether a number of locally used cores can be extracted from the
  # "workers" argument
  
  # if "workers" is a positive number, return that number
  if (length(workers_arg) == 1L &&
      is.numeric(workers_arg) &&
      !is.na(workers_arg) &&
      workers_arg >= 0)
  {
    return(as.integer(workers_arg))
  }
  
  # if "workers" is a character vector of "localhosts", return the vector length
  if (length(workers_arg) >= 1L &&
      is.character(workers_arg) &&
      all(workers_arg %in% localhosts))
  {
    return(length(workers_arg))
  }
  
  # if "workers" is a function call, check whether the return value is one of
  # the above tested options
  if (is.call(workers_arg))
  {
    
    if (test_default_arguments)
    {
      # availableCores() is the default "workers" argument for
      # future::multisession and future::multiprocess. It cannot be evaluated
      # directly because this doesn't work when the future package is not loaded
      if (workers_arg == "availableCores()")
      {
        # The result can be returned directly because it is always a positive integer
        return(future::availableCores())
      }
      
      # availableCores(constraints = "multicore") is the default "workers"
      # argument for future::multicore
      if (workers_arg == 'availableCores(constraints = "multicore")')
      {
        return(future::availableCores(constraints = "multicore"))
      }
      
      # availableWorkers() is the default "workers" argument for future::cluster
      if (workers_arg == "availableWorkers()")
      {
        available_workers <- future::availableWorkers()
        
        # availableWorkers() returns a character vector of worker names. If all
        # of them are "localhosts", return the vector length.
        if (length(available_workers) >= 1L &&
            is.character(available_workers) &&
            all(available_workers %in% localhosts))
        {
          return(length(available_workers))
        }
      }
    }
    
    # if none of the above applied, try to evaluate the function call
    evaluated_workers_arg <- tryCatch(
      eval(workers_arg),
      error = function(dummy) { return(NULL) } # on error: return NULL
    )
    
    # I would prefer the following over manually comparing calls to default
    # values (as done above)
    if (!test_default_arguments)
    {
      if (is.null(evaluated_workers_arg)) # if the "normal" evaluation didn't work
      {
        # Try to evaluate the function call in the environment of the future
        # package. This deals with situations where functions cannot be evaluated
        # because the future package has not been loaded (using library(future),
        # library(lidR), or others).
        evaluated_workers_arg <- tryCatch(
          # use an arbitrary future function to get the environment of the future package
          eval(workers_arg, envir = environment(future::plan)),
          error = function(dummy) { return(NULL) }
        )
      }
    }
    
    # if the call could not be evaluated, return NULL
    if (is.null(evaluated_workers_arg)) { return(NULL) }
    
    # If the call could be evaluated, check if the result is a number or a
    # vector of localhosts.
    if (length(evaluated_workers_arg) == 1L &&
        is.numeric(evaluated_workers_arg) &&
        !is.na(evaluated_workers_arg) &&
        evaluated_workers_arg >= 0)
    {
      return(as.integer(evaluated_workers_arg))
    }
    
    if (length(evaluated_workers_arg) >= 1L &&
        is.character(evaluated_workers_arg) &&
        all(evaluated_workers_arg %in% localhosts))
    {
      return(length(evaluated_workers_arg))
    }
    
  } # end if (is.call(workers_arg))
  
  # -> "workers" is neither a number, nor a character vector of localhosts,
  # nor a function call returning any of these values.
  return(NULL)
  
  # nocov end
}

# sequential
future::plan(future::sequential)
try_to_get_num_future_cores(test_default_arguments = TRUE)
#> [1] 1
try_to_get_num_future_cores(test_default_arguments = FALSE)
#> [1] 1

# transparent
future::plan(future::transparent)
try_to_get_num_future_cores(test_default_arguments = TRUE)
#> [1] 1
try_to_get_num_future_cores(test_default_arguments = FALSE)
#> [1] 1

# multisession
future::plan(future::multisession)
try_to_get_num_future_cores(test_default_arguments = TRUE)
#> system 
#>      4
try_to_get_num_future_cores(test_default_arguments = FALSE)
#> [1] 4

# multicore
future::plan(future::multicore)
try_to_get_num_future_cores(test_default_arguments = TRUE)
#> system 
#>      1
try_to_get_num_future_cores(test_default_arguments = FALSE)
#> [1] 1

# multiprocess (deprecated)
suppressWarnings(future::plan(future::multiprocess))
try_to_get_num_future_cores(test_default_arguments = TRUE)
#> system 
#>      4
try_to_get_num_future_cores(test_default_arguments = FALSE)
#> [1] 4

# cluster
future::plan(future::cluster)
try_to_get_num_future_cores(test_default_arguments = TRUE)
#> [1] 4
try_to_get_num_future_cores(test_default_arguments = FALSE)
#> [1] 4

# remote
future::plan(future::remote(workers = c("localhost", "localhost")))
try_to_get_num_future_cores(test_default_arguments = TRUE)
#> [1] 2
try_to_get_num_future_cores(test_default_arguments = FALSE)
#> [1] 2

# multisession - custom number of workers
future::plan(future::multisession(workers = 2))
try_to_get_num_future_cores(test_default_arguments = TRUE)
#> [1] 2
try_to_get_num_future_cores(test_default_arguments = FALSE)
#> [1] 2

# multisession - supplying availableCores "manually"
future::plan(future::multisession(workers = future::availableCores()))
try_to_get_num_future_cores(test_default_arguments = TRUE)
#> [1] 4
try_to_get_num_future_cores(test_default_arguments = FALSE)
#> [1] 4

# multisession - more workers than cores (apparently this is possible with future)
future::availableCores()
#> system 
#>      4
future::plan(future::multisession(workers = 8))
try_to_get_num_future_cores(test_default_arguments = TRUE)
#> [1] 8
try_to_get_num_future_cores(test_default_arguments = FALSE)
#> [1] 8

# cluster - workers from parallel package
future::plan(future::cluster, workers = parallel::makeCluster(3, type='SOCK'))
try_to_get_num_future_cores(test_default_arguments = TRUE)
#> NULL
try_to_get_num_future_cores(test_default_arguments = FALSE)
#> NULL

# nested futures
future::plan(list(future.batchtools::batchtools_torque, future::multisession))
try_to_get_num_future_cores(test_default_arguments = TRUE)
#> NULL
try_to_get_num_future_cores(test_default_arguments = FALSE)
#> NULL

# stop any clusters
future::plan(future::sequential)

Created on 2021-04-06 by the reprex package (v1.0.0)

@Jean-Romain
Copy link
Collaborator

What's your opinion on the three options that I listed in my previous comment? Were my descriptions understandable?

That's great! I really appreciate your contribution. Thank you. In both mentioned cases NULL is the good output. I didn't know myself how to setup a MPI cluster so I guess somebody doing it is able to understand the message about OpenMP. For nested plans in my opinion we don't care because the catalog processing does not support nested strategies. I plan do add such capability to be able e.g. to send the computation on several remote machines and each machine could used several cores. But right now it is not possible. So NULL is the way.

However, I omitted the verbose("Parallelization strategy: <some strategy>") [...]

You're right and it is not a big deal 😉

I still think that using eval(workers_arg, envir = environment(future::<some function>)
could be meaningful. At least it would make the function shorter because we would not have to include the
if (workers_arg == "availableCores()") { return(future::availableCores()) } blocks. Also, if it works the way that I understand it, it will potentially cover more cases than just checking the known values of the workers argument.

eval(workers_arg, envir = environment(future::plan))

Excellent. This is the way! 👍

When making the PR do not forget to update the DESCRIPTION file to add your name (or pseudonym as you wish).

@Jean-Romain
Copy link
Collaborator

Also I noticed your PR is from devel to devel. Please make the PR from master to master because it is a fix that must be released with the next minor update. Thanks

@Jean-Romain Jean-Romain added Bug A bug in the package Enhancement Not actually a bug but a possible improvement labels Apr 7, 2021
@Jean-Romain Jean-Romain self-assigned this Apr 7, 2021
@Jean-Romain Jean-Romain closed this Apr 8, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Bug A bug in the package Enhancement Not actually a bug but a possible improvement
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants