Overview

RcppParallel provides a complete toolkit for creating portable, high-performance parallel algorithms without requiring direct manipulation of operating system threads. RcppParallel includes:

  • Intel TBB (v4.3), a C++ library for task parallelism with a wide variety of parallel algorithms and data structures (Windows, OS X, Linux, and Solaris x86 only).

  • TinyThread, a C++ library for portable use of operating system threads.

  • RVector and RMatrix wrapper classes for safe and convenient access to R data structures in a multi-threaded environment.

  • High level parallel functions (parallelFor and parallelReduce) that use Intel TBB as a back-end on systems that support it and TinyThread on other platforms.

Examples

Some simple example uses of RcppParallel along with performance increases achieved over serial code. The benchmarks were executed on a 2.6GHz Haswell MacBook Pro with 4 cores (8 with hyperthreading).

Parallel Matrix Transform — Demonstrates using parallelFor to transform a matrix (take the square root of each element) in parallel. In this example the parallel version performs about 2.5x faster than the serial version.

Parallel Vector Sum — Demonstrates using parallelReduce to take the sum of a vector in parallel. In this example the parallel version performs 4.5x faster than the serial version.

Parallel Distance Matrix — Demonstrates using parallelFor to compute pairwise distances for each row in an input data matrix. In this example the parallel version performs 5.5x faster than the serial version.

Parallel Inner Product — Demonstrates using parallelReduce to compute the inner product of two vectors in parallel. In this example the parallel version performs 2.5x faster than the serial version.

You may get the hang of using RcppParallel by studying the examples however you should still also review this guide in detail as it includes important documentation on thread safety, tuning, and using Intel TBB directly for more advanced use cases.

Getting Started

You can install the RcppParallel package from CRAN as follows:

install.packages("RcppParallel")

sourceCpp

Add the following to a standalone C++ source file to import RcppParallel:

// [[Rcpp::depends(RcppParallel)]]
#include <RcppParallel.h>

When you compile the file using Rcpp::sourceCpp the required compiler and linker settings for RcppParallel will be automatically included in the compilation.

R Packages

If you want to use RcppParallel from within an R package you need to edit several files to create the requisite build and runtime links. The following additions should be made:

DESCRIPTION

Imports: RcppParallel
LinkingTo: RcppParallel
SystemRequirements: GNU make

NAMESPACE

importFrom(RcppParallel, RcppParallelLibs)

src\Makevars

CXX_STD = CXX11
PKG_LIBS += $(shell ${R_HOME}/bin/Rscript -e "RcppParallel::RcppParallelLibs()")

src\Makevars.win

CXX_STD = CXX11
PKG_CXXFLAGS += -DRCPP_PARALLEL_USE_TBB=1

PKG_LIBS += $(shell "${R_HOME}/bin${R_ARCH_BIN}/Rscript.exe" \
              -e "RcppParallel::RcppParallelLibs()")

Note that the Windows variation (Makevars.win) requires an extra PKG_CXXFLAGS entry that enables the use of TBB. This is because TBB is not used by default on Windows (for backward compatibility with a previous version of RcppParallel which lacked support for TBB on Windows).

TBB uses a subset of C++11 features – for example, long long is not defined in the C++98 standard. Although most compilers make this type available as a compiler extension, they may emit warnings / errors when using these types while compiling in C++98 mode. To ensure that your package builds on CRAN without these warnings, you can enforce compilation using C++11, as done by CXX_STD = CXX11.

After you’ve added the above to the package you can simply include the main RcppParallel package header in source files that need to use it:

#include <RcppParallel.h>

Thread Safety

A major goal of RcppParallel is to make it possible to write parallel code without traditional threading and locking primitives (which are notoriously complicated and difficult to get right). This is achieved for the most part by parallelFor and parallelReduce however the fact that the R API itself is single-threaded must also be taken into consideration.

API Restrictions

The code that you write within parallel workers should not call the R or Rcpp API in any fashion. This is because R is single-threaded and concurrent interaction with it’s data structures can cause crashes and other undefined behavior. Here is the official guidance from Writing R Extensions:

Calling any of the R API from threaded code is ‘for experts only’: they will need to read the source code to determine if it is thread-safe. In particular, code which makes use of the stack-checking mechanism must not be called from threaded code.

Not being able to call the R or Rcpp API creates an obvious challenge: how to read and write to R vectors and matrices. Fortunately, R vectors and matrices are just contiguous arrays of int, double, etc. so can be accessed using traditional array and pointer offsets. The next section describes a safe and high level way to do this.

Safe Accessors

To provide safe and convenient access to the arrays underlying R vectors and matrices RcppParallel introduces several accessor classes:

  • RVector<T> — Wrap R vectors of various types

  • RMatrix<T> — Wrap R matrices of various types (also includes Row and Column classes)

To create a thread safe accessor for an Rcpp vector or matrix just construct an instance of RVector or RMatrix with it. For example:

// [[Rcpp::export]]
IntegerVector transformVector(IntegerVector x) {
  RVector<int> input(x);
  // etc...
}

Similarly, if you need to return a vector as a result of a parallel transformation you should first create it using Rcpp then construct a wrapper for writing from multiple threads. For example:

// [[Rcpp::export]]
IntegerVector transformVector(IntegerVector x) {
  RVector<int> input(x);        // create threadsafe wrapper to input
  IntegerVector y(x.size());    // allocate output vector
  RVector<int> output(y);       // create threadsafe wrapper to output
  
  // ...transform vector in parallel ...
  
  return y;
}

Locking

When using RcppParallel you typically do not need to worry about explicit locking, as the mechanics of parallelFor and parallelReduce (explained below) take care of providing safe windows into input and output data that have no possibility of contention. Nevertheless, if for some reason you do need to synchronize access to shared data, you can use the TinyThread locking classes (automatically available via RcppParallel.h):

Function Description
lock_guard Lock guard class. The constructor locks the mutex, and the destructor unlocks the mutex, so the mutex will automatically be unlocked when the lock guard goes out of scope.
mutex Mutual exclusion object for synchronizing access to shared memory areas for several threads. The mutex is non-recursive (i.e. a program may deadlock if the thread that owns a mutex object calls lock() on that object).
recursive_mutex Mutual exclusion object for synchronizing access to shared memory areas for several threads. The mutex is recursive (i.e. a thread may lock the mutex several times, as long as it unlocks the mutex the same number of times).
fast_mutex Mutual exclusion object for synchronizing access to shared memory areas for several threads. It is similar to the tthread::mutex class, but instead of using system level functions, it is implemented as an atomic spin lock with very low CPU overhead.

See the complete TinyThread documentation for additional details.

The TinyThread locking primitives will work on all platforms. If you are using TBB directly you can alternatively use the synchronization classes provided by TBB. See the section on TBB Synchronization for additional details.

Algorithms

RcppParallel provides two high level parallel algorithms: parallelFor can be used to convert the work of a standard serial “for” loop into a parallel one and parallelReduce can be used for accumulating aggregate or other values.

parallelFor

To use parallelFor, you create a Worker object that defines an operator() which is called by the parallel scheduler. This function is passed a [begin,end) exclusive range which is a safe window (i.e. not in use by other threads) into the input or output data. Note that the end element is not included in the range (just like an STL end iterator).

For example, here’s a Worker object that takes the square root of it’s input and writes it into it’s output:

// [[Rcpp::depends(RcppParallel)]]
#include <RcppParallel.h>
using namespace RcppParallel;

struct SquareRoot : public Worker
{
   // source matrix
   const RMatrix<double> input;
   
   // destination matrix
   RMatrix<double> output;
   
   // initialize with source and destination
   SquareRoot(const NumericMatrix input, NumericMatrix output) 
      : input(input), output(output) {}
   
   // take the square root of the range of elements requested
   void operator()(std::size_t begin, std::size_t end) {
      std::transform(input.begin() + begin, 
                     input.begin() + end, 
                     output.begin() + begin, 
                     ::sqrt);
   }
};

Note that SquareRoot derives from RcppParallel::Worker. This is required for function objects passed to parallelFor.

Here’s a function that calls the SquareRoot worker we defined:

// [[Rcpp::export]]
NumericMatrix parallelMatrixSqrt(NumericMatrix x) {
  
  // allocate the output matrix
  NumericMatrix output(x.nrow(), x.ncol());
  
  // SquareRoot functor (pass input and output matrixes)
  SquareRoot squareRoot(x, output);
  
  // call parallelFor to do the work
  parallelFor(0, x.length(), squareRoot);
  
  // return the output matrix
  return output;
}

parallelReduce

To use parallelReduce you create a Worker object as well, this object should include:

  1. A standard and “splitting” constructor. The standard constructor takes the input data and initializes whatever value is being accumulated (e.g. initialize a sum to zero). The splitting constructor is called when work needs to be split onto other threads—it takes a reference to the instance it is being split from and simply copies the pointer to the input data and initializes it’s “accumulated” value to zero.

  2. An operator() which performs the work. This works just like the operator() in parallelFor, but instead of writing to another vector or matrix it typically will accumulate a value.

  3. A join method which composes the operations of two worker instances that were previously split. Here we simply combine the accumulated value of the instance we are being joined with to our own.

For example, here’s a Worker object that is used to sum a vector:

// [[Rcpp::depends(RcppParallel)]]
#include <RcppParallel.h>
using namespace RcppParallel;

struct Sum : public Worker
{   
   // source vector
   const RVector<double> input;
   
   // accumulated value
   double value;
   
   // constructors
   Sum(const NumericVector input) : input(input), value(0) {}
   Sum(const Sum& sum, Split) : input(sum.input), value(0) {}
   
   // accumulate just the element of the range I've been asked to
   void operator()(std::size_t begin, std::size_t end) {
      value += std::accumulate(input.begin() + begin, input.begin() + end, 0.0);
   }
     
   // join my value with that of another Sum
   void join(const Sum& rhs) { 
      value += rhs.value; 
   }
};

Now that we’ve defined the Worker, implementing the parallel sum function is straightforward. Just initialize an instance of Sum with an input vector and call parallelReduce:

// [[Rcpp::export]]
double parallelVectorSum(NumericVector x) {
   
   // declare the Sum instance 
   Sum sum(x);
   
   // call parallel_reduce to start the work
   parallelReduce(0, x.length(), sum);
   
   // return the computed sum
   return sum.value;
}

TBB Algorithms

RcppParallel provides the parallelFor and parallelReduce algorithms however the TBB library includes a wealth of more advanced algorithms and other tools for parallelization. See the Intel TBB article for additional details.

Tuning

There are several settings available for tuning the behavior of parallel algorithms. These settings as well as benchmarking techniques are covered below.

Grain Size

The grain size of a parallel algorithm sets a minimum chunk size for parallelization. In other words, at what point to stop processing input on separate threads (as sometimes creating more threads can degrade the performance of an algorithm by introducing excessive synchronization overhead).

By default the grain size for TBB (and thus for parallelFor and parallelReduce) is 1. You can change the grain size by passing an additional parameter to these functions. For example:

parallelReduce(0, x.length(), sum, 100);

This will prevent the creation of threads that process less than 100 items. You should experiment with various chunk sizes and use the benchmarking tools described below to measure their effectiveness. The Intel TBB website includes a detailed discussion of grain sizes and partitioning which has some useful guidelines for tweaking grain sizes.

Threads Used

By default all of the available cores on a machine are used for parallel algorithms. You may instead want to use a fixed number of threads or a fixed proportion of cores available on the machine.

R rather than C++ functions are provided to control these settings so that users of your algorithm can control the use of resources on their system. You can call the setThreadOptions function to allocate threads. For example, the following sets a maximum of 4 threads:

RcppParallel::setThreadOptions(numThreads = 4)

To use a proportion of available cores you can use the defaultNumThreads function. For example, the following says to use half of the available cores on a system:

library(RcppParallel)
setThreadOptions(numThreads = defaultNumThreads() / 2)

Benchmarking

As you experiment with various settings to tune your parallel algorithms you should always measure the results. The rbenchmark package has some useful tools for doing this. For example, here’s a benchmark of the parallel matrix square root example from above (in this case it’s a comparison against the serial version):

# allocate a matrix
m <- matrix(as.numeric(c(1:1000000)), nrow = 1000, ncol = 1000)

# ensure that serial and parallel versions give the same result
stopifnot(identical(matrixSqrt(m), parallelMatrixSqrt(m)))

# compare performance of serial and parallel
library(rbenchmark)
res <- benchmark(matrixSqrt(m),
                 parallelMatrixSqrt(m),
                 order="relative")
res[,1:4]
                   test replications elapsed relative
2 parallelMatrixSqrt(m)          100   0.294    1.000
1         matrixSqrt(m)          100   0.755    2.568