How to parallelize future_pmap() across multiple slurm nodes

I have access to a large computing cluster with many nodes each of which has >16 cores, running Slurm 20.11.3. I want to run a job in parallel using furrr::future_pmap(). I can parallelize across multiple cores on a single node but I have not been able to figure out the correct syntax to take advantage of cores on multiple nodes. See this related question.

Here is a reproducible example where I made a function that sleeps for 5 seconds and returns the starting time, ending time, and the node name.

library(furrr)

# Set up parallel processing 
options(mc.cores = 64)
plan(
    list(tweak(multicore, workers = 16),
         tweak(multicore, workers = 16),
         tweak(multicore, workers = 16),
         tweak(multicore, workers = 16))
)


fake_fn <- function(x) {
  t1 <- Sys.time()
  Sys.sleep(x)
  t2 <- Sys.time()
  hn <- system2('hostname', stdout = TRUE)
  data.frame(start=t1, end=t2, hostname=hn)
}

stuff <- data.frame(x = rep(5, 64))

output <- future_pmap_dfr(stuff, function(x) fake_fn(x))

I ran the job using salloc --nodes=4 --ntasks=64 and running the above R script interactively.

The script runs in about 20 seconds and returns the same hostname for all rows, indicating that it is running 16 iterations simultaneously on one node but not 64 iterations simultaneously split across 4 nodes as intended. How should I change the plan() syntax so that I can take advantage of the multiple nodes?

edit: I also tried a couple other things:

  • I replaced multicore with multisession, but saw no difference in output.
  • I replaced the plan(list(...)) with plan(cluster(workers = availableWorkers()) but it just hangs.

Solution 1:

options(mc.cores = 64)
plan(
    list(tweak(multicore, workers = 16),
         tweak(multicore, workers = 16),
         tweak(multicore, workers = 16),
         tweak(multicore, workers = 16))
)

Sorry, this does not work. When you specify a list of future strategies like this, you are specifying what should be used in nested future calls. In your future_pmap_dfr() example, it's only the first level in this list that will be used. The other three levels are never used. See https://future.futureverse.org/articles/future-3-topologies.html for more details.


I replaced ... with plan(cluster(workers = availableWorkers()) ...

Yes,

plan(cluster, workers = availableWorkers())

which is equivalent to the default,

plan(cluster)

is the correct attempt here.

... but it just hangs.

There could be two things going on here. The first one, is that the workers are set up one by one. So, if you got lots of them, it will take quite a while before plan() will complete. I recommend that you try with only two workers to confirm it works or doesn't work. You can also turn on debug output to see what happens, i.e.

library(future)
options(parallelly.debug = TRUE)
plan(cluster)

Second, using a PSOCK cluster across nodes requires that you have SSH access to those parallel workers. Not all HPC environments support that, e.g. they might prevent users from SSH:ing into compute nodes. This could also be what you're experiencing. As above, turn on debugging to figure out where it stalls.

Now, even if you managed to get this working, you would be faced with a limitation in R that limits you to have at most 125 parallel workers, but typically a bit less. You can read more about this limit in https://github.com/HenrikBengtsson/Wishlist-for-R/issues/28. It also shows that one can tweak the R source code and recompile to increase this limit to thousands.


An alternative to the above is to use the future.batchtools;

plan(future.batchtools::batchtools_slurm, workers = availableCores())

This would result in the tasks in future_pmap_dfr() will be resolved via n = availableCores() Slurm jobs. Of course, this comes with the extra overhead of the scheduler, e.g. queueing, launching, running, finishing, and reading the data back.

BTW, the best place to discuss these things is on https://github.com/HenrikBengtsson/future/discussions.