Are 'j'-expressions in 'data.table' automatically parallelised?

How should I understand the parallelism built into data.table objects? From the getDTthreads function documentation, it seems that shared memory parallelism is employed using OpenMP. That seems fairly low level, and I imagine that it only works for a certain subset of overloaded functions and operators.

Or, is data.table somehow smart enough to split work for even more complicated expressions? More specifically, to parallelize a j-expression, what restrictions do I need to take into account?

Not to run too much afoul of Stack Overflow's question policy, here is an example. I often want to apply a function to each object in a huge data.table. For example,

library(data.table)
n <- 100000L
dt <- data.table(a = rnorm(n), b = rnorm(n))
dt[, c := sapply(a, function(x) paste(x, 'silly example')]

Would the sapply call in the j-expression work on chunks of column a in parallel? Or is it a plain old base R sapply, which works sequentially?

If the latter is the case, then is embedding one of R's many parallel computing frameworks inside the j-expression a good approach? For example, can I safely and efficiently call foreach, future, et al. in the j-expression?


Solution 1:

From ?setDTthreads:

Internally parallelized code is used in the following places:

  • between.c - between()
  • cj.c - CJ()
  • coalesce.c - fcoalesce()
  • fifelse.c - fifelse()
  • fread.c - fread()
  • forder.c, fsort.c, and reorder.c - forder() and related
  • froll.c, frolladaptive.c, and frollR.c - froll() and family
  • fwrite.c - fwrite()
  • gsumm.c - GForce in various places, see GForce
  • nafill.c - nafill()
  • subset.c - Used in [.data.table subsetting
  • types.c - Internal testing usage

My understanding is that you should not expect data.table to make use of multithreading outside of the above use cases. Note that [.data.table uses multithreading for subsetting only, i.e., in i-expressions but not j-expressions. That is presumably just to speed up relational and logical operations, as in x[!is.na(a) & a > 0].

In a j-expression, sum and sapply are still just base::sum and base::sapply. You can test this with a benchmark:

library("data.table")
setDTthreads(4L)

x <- data.table(a = rnorm(2^25))
microbenchmark::microbenchmark(sum(x$a), x[, sum(a)], times = 1000L)
Unit: milliseconds
        expr      min       lq     mean   median       uq      max neval
    sum(x$a) 51.61281 51.68317 51.95975 51.84204 52.09202 56.67213  1000
 x[, sum(a)] 51.78759 51.89054 52.18827 52.07291 52.33486 61.11378  1000
x <- data.table(a = seq_len(1e+04L))
microbenchmark::microbenchmark(sapply(x$a, paste, "is a good number"), x[, sapply(a, paste, "is a good number")], times = 1000L)
Unit: milliseconds
                                      expr      min      lq     mean   median       uq      max neval
    sapply(x$a, paste, "is a good number") 14.07403 15.7293 16.72879 16.31326 17.49072 45.62300  1000
 x[, sapply(a, paste, "is a good number")] 14.56324 15.9375 17.03164 16.48971 17.69045 45.99823  1000

where it is clear that simply putting code into a j-expression does not improve performance.

data.table does recognize and handle certain constructs exceptionally. For instance, data.table uses its own radix-based forder instead of base::order when it sees x[order(...)]. (This feature is somewhat redundant now that users of base::order can request data.table's radix sort by passing method = "radix".) I haven't seen a "master list" of such exceptions.

As for whether using, e.g., parallel::mclapply inside of a j-expression can have performance benefits, I think the answer (as usual) depends on what you are trying to do and the scale of your data. Ultimately, you'll have to do your own benchmarks and profiling to find out. For example:

library("parallel")
cl <- makePSOCKcluster(4L)
microbenchmark::microbenchmark(x[, sapply(a, paste, "is a good number")], x[, parSapply(cl, a, paste, "is a good number")], times = 1000L)
stopCluster(cl)
Unit: milliseconds
                                             expr       min        lq      mean    median        uq      max neval
        x[, sapply(a, paste, "is a good number")] 14.553934 15.982681 17.105667 16.585525 17.864623 48.81276  1000
 x[, parSapply(cl, a, paste, "is a good number")]  7.675487  8.426607  9.022947  8.802454  9.334532 25.67957  1000

So it is possible to see speed-up, though sometimes you pay the price in memory usage. For small enough problems, the overhead associated with R-level parallelism can definitely outweigh the performance benefits.

You'll find good thread about integrating parallel and data.table (including reasons not to) here.