RcppParallel provides a complete toolkit for creating portable, high-performance parallel algorithms without requiring direct manipulation of operating system threads. RcppParallel includes:
Intel TBB, a C++ library for task parallelism with a wide variety of parallel algorithms and data structures (Windows, OS X, Linux, and Solaris x86 only).
TinyThread, a C++ library for portable use of operating system threads.
RVector
and RMatrix
wrapper classes for safe and convenient access to R data structures in a multi-threaded environment.
High level parallel functions (parallelFor
and parallelReduce
) that use Intel TBB as a back-end on systems that support it and TinyThread on other platforms.
Some simple example uses of RcppParallel along with performance increases achieved over serial code. The benchmarks were executed on a 2.6GHz Haswell MacBook Pro with 4 cores (8 with hyperthreading).
Parallel Matrix Transform — Demonstrates using parallelFor
to transform a matrix (take the square root of each element) in parallel. In this example the parallel version performs about 2.5x faster than the serial version.
Parallel Vector Sum — Demonstrates using parallelReduce
to take the sum of a vector in parallel. In this example the parallel version performs 4.5x faster than the serial version.
Parallel Distance Matrix — Demonstrates using parallelFor
to compute pairwise distances for each row in an input data matrix. In this example the parallel version performs 5.5x faster than the serial version.
Parallel Inner Product — Demonstrates using parallelReduce
to compute the inner product of two vectors in parallel. In this example the parallel version performs 2.5x faster than the serial version.
You may get the hang of using RcppParallel by studying the examples however you should still also review this guide in detail as it includes important documentation on thread safety, tuning, and using Intel TBB directly for more advanced use cases.
You can install the RcppParallel package from CRAN as follows:
install.packages("RcppParallel")
Add the following to a standalone C++ source file to import RcppParallel:
// [[Rcpp::depends(RcppParallel)]]
#include <RcppParallel.h>
When you compile the file using Rcpp::sourceCpp
the required compiler and linker settings for RcppParallel will be automatically included in the compilation.
If you want to use RcppParallel from within an R package you need to edit several files to create the requisite build and runtime links. The following additions should be made:
DESCRIPTION
Imports: RcppParallel
LinkingTo: RcppParallel
SystemRequirements: GNU make
NAMESPACE
importFrom(RcppParallel, RcppParallelLibs)
src\Makevars
PKG_LIBS += $(shell ${R_HOME}/bin/Rscript -e "RcppParallel::RcppParallelLibs()")
src\Makevars.win
PKG_CXXFLAGS += -DRCPP_PARALLEL_USE_TBB=1
PKG_LIBS += $(shell "${R_HOME}/bin${R_ARCH_BIN}/Rscript.exe" \
-e "RcppParallel::RcppParallelLibs()")
Note that the Windows variation (Makevars.win) requires an extra PKG_CXXFLAGS
entry that enables the use of TBB. This is because TBB is not used by default on Windows (for backward compatibility with a previous version of RcppParallel which lacked support for TBB on Windows).
After you’ve added the above to the package you can simply include the main RcppParallel package header in source files that need to use it:
#include <RcppParallel.h>
A major goal of RcppParallel is to make it possible to write parallel code without traditional threading and locking primitives (which are notoriously complicated and difficult to get right). This is achieved for the most part by parallelFor
and parallelReduce
however the fact that the R API itself is single-threaded must also be taken into consideration.
The code that you write within parallel workers should not call the R or Rcpp API in any fashion. This is because R is single-threaded and concurrent interaction with it’s data structures can cause crashes and other undefined behavior. Here is the official guidance from Writing R Extensions:
Calling any of the R API from threaded code is ‘for experts only’: they will need to read the source code to determine if it is thread-safe. In particular, code which makes use of the stack-checking mechanism must not be called from threaded code.
Not being able to call the R or Rcpp API creates an obvious challenge: how to read and write to R vectors and matrices. Fortunately, R vectors and matrices are just contiguous arrays of int
, double
, etc. so can be accessed using traditional array and pointer offsets. The next section describes a safe and high level way to do this.
To provide safe and convenient access to the arrays underlying R vectors and matrices RcppParallel introduces several accessor classes:
RVector<T>
— Wrap R vectors of various types
RMatrix<T>
— Wrap R matrices of various types (also includes Row
and Column
classes)
To create a thread safe accessor for an Rcpp vector or matrix just construct an instance of RVector
or RMatrix
with it. For example:
// [[Rcpp::export]]
IntegerVector transformVector(IntegerVector x) {
RVector<int> input(x);
// etc...
}
Similarly, if you need to return a vector as a result of a parallel transformation you should first create it using Rcpp then construct a wrapper for writing from multiple threads. For example:
// [[Rcpp::export]]
IntegerVector transformVector(IntegerVector x) {
RVector<int> input(x); // create threadsafe wrapper to input
IntegerVector y(x.size()); // allocate output vector
RVector<int> output(y); // create threadsafe wrapper to output
// ...transform vector in parallel ...
return y;
}
When using RcppParallel you typically do not need to worry about explicit locking, as the mechanics of parallelFor
and parallelReduce
(explained below) take care of providing safe windows into input and output data that have no possibility of contention. Nevertheless, if for some reason you do need to synchronize access to shared data, you can use the TinyThread locking classes (automatically available via RcppParallel.h
):
Function | Description |
---|---|
lock_guard |
Lock guard class. The constructor locks the mutex, and the destructor unlocks the mutex, so the mutex will automatically be unlocked when the lock guard goes out of scope. |
mutex |
Mutual exclusion object for synchronizing access to shared memory areas for several threads. The mutex is non-recursive (i.e. a program may deadlock if the thread that owns a mutex object calls lock() on that object). |
recursive_mutex |
Mutual exclusion object for synchronizing access to shared memory areas for several threads. The mutex is recursive (i.e. a thread may lock the mutex several times, as long as it unlocks the mutex the same number of times). |
fast_mutex |
Mutual exclusion object for synchronizing access to shared memory areas for several threads. It is similar to the tthread::mutex class, but instead of using system level functions, it is implemented as an atomic spin lock with very low CPU overhead. |
See the complete TinyThread documentation for additional details.
The TinyThread locking primitives will work on all platforms. If you are using TBB directly you can alternatively use the synchronization classes provided by TBB. See the section on TBB Synchronization for additional details.
RcppParallel provides two high level parallel algorithms: parallelFor
can be used to convert the work of a standard serial “for” loop into a parallel one and parallelReduce
can be used for accumulating aggregate or other values.
To use parallelFor
, you create a Worker
object that defines an operator()
which is called by the parallel scheduler. This function is passed a [begin,end)
exclusive range which is a safe window (i.e. not in use by other threads) into the input or output data. Note that the end
element is not included in the range (just like an STL end
iterator).
For example, here’s a Worker
object that takes the square root of it’s input and writes it into it’s output:
// [[Rcpp::depends(RcppParallel)]]
#include <RcppParallel.h>
using namespace RcppParallel;
struct SquareRoot : public Worker
{
// source matrix
const RMatrix<double> input;
// destination matrix
RMatrix<double> output;
// initialize with source and destination
SquareRoot(const NumericMatrix input, NumericMatrix output)
: input(input), output(output) {}
// take the square root of the range of elements requested
void operator()(std::size_t begin, std::size_t end) {
std::transform(input.begin() + begin,
input.begin() + end,
output.begin() + begin,
::sqrt);
}
};
Note that SquareRoot
derives from RcppParallel::Worker
. This is required for function objects passed to parallelFor
.
Here’s a function that calls the SquareRoot
worker we defined:
// [[Rcpp::export]]
NumericMatrix parallelMatrixSqrt(NumericMatrix x) {
// allocate the output matrix
NumericMatrix output(x.nrow(), x.ncol());
// SquareRoot functor (pass input and output matrixes)
SquareRoot squareRoot(x, output);
// call parallelFor to do the work
parallelFor(0, x.length(), squareRoot);
// return the output matrix
return output;
}
To use parallelReduce
you create a Worker
object as well, this object should include:
A standard and “splitting” constructor. The standard constructor takes the input data and initializes whatever value is being accumulated (e.g. initialize a sum to zero). The splitting constructor is called when work needs to be split onto other threads—it takes a reference to the instance it is being split from and simply copies the pointer to the input data and initializes it’s “accumulated” value to zero.
An operator() which performs the work. This works just like the operator() in parallelFor
, but instead of writing to another vector or matrix it typically will accumulate a value.
A join method which composes the operations of two worker instances that were previously split. Here we simply combine the accumulated value of the instance we are being joined with to our own.
For example, here’s a Worker
object that is used to sum a vector:
// [[Rcpp::depends(RcppParallel)]]
#include <RcppParallel.h>
using namespace RcppParallel;
struct Sum : public Worker
{
// source vector
const RVector<double> input;
// accumulated value
double value;
// constructors
Sum(const NumericVector input) : input(input), value(0) {}
Sum(const Sum& sum, Split) : input(sum.input), value(0) {}
// accumulate just the element of the range I've been asked to
void operator()(std::size_t begin, std::size_t end) {
value += std::accumulate(input.begin() + begin, input.begin() + end, 0.0);
}
// join my value with that of another Sum
void join(const Sum& rhs) {
value += rhs.value;
}
};
Now that we’ve defined the Worker, implementing the parallel sum function is straightforward. Just initialize an instance of Sum
with an input vector and call parallelReduce
:
// [[Rcpp::export]]
double parallelVectorSum(NumericVector x) {
// declare the Sum instance
Sum sum(x);
// call parallel_reduce to start the work
parallelReduce(0, x.length(), sum);
// return the computed sum
return sum.value;
}
RcppParallel provides the parallelFor
and parallelReduce
algorithms however the TBB library includes a wealth of more advanced algorithms and other tools for parallelization. See the Intel TBB article for additional details.
There are several settings available for tuning the behavior of parallel algorithms. These settings as well as benchmarking techniques are covered below.
The grain size of a parallel algorithm sets a minimum chunk size for parallelization. In other words, at what point to stop processing input on separate threads (as sometimes creating more threads can degrade the performance of an algorithm by introducing excessive synchronization overhead).
By default the grain size for TBB (and thus for parallelFor
and parallelReduce
) is 1. You can change the grain size by passing an additional parameter to these functions. For example:
parallelReduce(0, x.length(), sum, 100);
This will prevent the creation of threads that process less than 100 items. You should experiment with various chunk sizes and use the benchmarking tools described below to measure their effectiveness. The Intel TBB website includes a detailed discussion of grain sizes and partitioning which has some useful guidelines for tweaking grain sizes.
By default all of the available cores on a machine are used for parallel algorithms. You may instead want to use a fixed number of threads or a fixed proportion of cores available on the machine.
R rather than C++ functions are provided to control these settings so that users of your algorithm can control the use of resources on their system. You can call the setThreadOptions
function to allocate threads. For example, the following sets a maximum of 4 threads:
RcppParallel::setThreadOptions(numThreads = 4)
To use a proportion of available cores you can use the defaultNumThreads
function. For example, the following says to use half of the available cores on a system:
library(RcppParallel)
setThreadOptions(numThreads = defaultNumThreads() / 2)
As you experiment with various settings to tune your parallel algorithms you should always measure the results. The rbenchmark package has some useful tools for doing this. For example, here’s a benchmark of the parallel matrix square root example from above (in this case it’s a comparison against the serial version):
# allocate a matrix
m <- matrix(as.numeric(c(1:1000000)), nrow = 1000, ncol = 1000)
# ensure that serial and parallel versions give the same result
stopifnot(identical(matrixSqrt(m), parallelMatrixSqrt(m)))
# compare performance of serial and parallel
library(rbenchmark)
res <- benchmark(matrixSqrt(m),
parallelMatrixSqrt(m),
order="relative")
res[,1:4]
test replications elapsed relative
2 parallelMatrixSqrt(m) 100 0.294 1.000
1 matrixSqrt(m) 100 0.755 2.568