UMBC logo
UMBC High Performance Computing Facility
How to run R programs on tara

Introduction

Running serial R code on the cluster is similar to running any other serial job. We give a serial example, and then demonstrate how to run a parallel job using the SNOW and Rmpi packages. Make sure you've read the tutorial for C programs first, to understand the basics of serial and parallel programming on tara.

For more information about the software, see the R website.

Serial example

In order to run R on the cluster nodes, we will need an R script and a batch submission script. Here is a sample script that makes a 3D plot of a hemisphere.
positive_integer_radius = 30
n = positive_integer_radius * 2+1
index = matrix((0:(n * n-1)) %% n-positive_integer_radius, nrow = n)
x = 0:(n-1) - positive_integer_radius
y = x
z = sqrt(positive_integer_radius * positive_integer_radius - index^2 - t(index)^2)
z[is.na(z)] = 0
postscript("perspective.eps", horizontal = FALSE, height = 4, width = 5, pointsize = 10)
persp(x, y, z, phi = 30, theta = 0, lphi = 60, ltheta = 70, border = NA, 
    col = "dark blue", shade = 1, scale = FALSE)
dev.off()
postscript("contour.eps", horizontal = FALSE, height = 4, width = 5, pointsize = 10)
contour(x, y, z)
dev.off()


Download: ../code-2010/hemisphere-R/hemisphere.R
In order to run this script on the cluster nodes, we will need a batch script
#!/bin/bash
#SBATCH --job-name=hemisphere
#SBATCH --output=slurm.out
#SBATCH --error=slurm.err
#SBATCH --partition=develop

R --no-save < hemisphere.R

Download: ../code-2010/hemisphere-R/run.slurm
To run that script, you must submit it to the scheduler as a job
[araim1@tara-fe1 hemisphere-R]$ sbatch run.slurm
sbatch: Submitted batch job 2623
[araim1@tara-fe1 hemisphere-R]$
Once your job completes, it should produce several files. If you list them, you should see something like this
[araim1@tara-fe1 hemisphere-R]$ ls
contour.eps   hemisphere.gif  perspective.eps  slurm.out
hemisphere.R  run.slurm       slurm.err
[araim1@tara-fe1 hemisphere-R]$ 
The slurm.err and slurm.out files contain everything that your job printed to its error and output streams, respectively. The two .eps files are the plots that your program generated. If you open the files in a .eps viewer, they should look like this

hemisphere shown in GIF format

You can download sample output files using these links

Using contributed libraries

The parallel R packages have currently only been tested for GCC + OpenMPI 1.3.3 p1. To try the examples, change your switcher settings accordingly, e.g.
[araim1@tara-fe1 ~]$ switcher mpi = gcc-openmpi-1.3.3-p1
[araim1@tara-fe1 ~]$ switcher_reload
Several R libraries have already been installed on the cluster for your use. To make them available to you, create a plain text file in your home directory called ".Renviron" and enter the following line into it
R_LIBS=/usr/cluster/contrib/gcc-openmpi-1.3.3/Rlibs
If for some reason you need to add another library path, you can use ":" to delimit them
R_LIBS=/usr/cluster/contrib/gcc-openmpi-1.3.3/Rlibs:/path/to/more/libs
To make sure you can access the contributed libraries, run the following in R. You should see a similar "contrib" entry in your .libPaths(), and there should be no errors when you try to load the two libraries.
[araim1@tara-fe1 ~]$ R

R version 2.11.1 (2010-05-31)
Copyright (C) 2010 The R Foundation for Statistical Computing
ISBN 3-900051-07-0
...

> .libPaths()
[1] "/usr/cluster/contrib/gcc-openmpi-1.3.3/Rlibs"
[2] "/usr/lib64/R/library"
> library(snow)
> library(Rmpi)

A SNOW example

Now we'll look at running parallel R jobs with SNOW. SNOW (Simple Network Of Workstations) provides a powerful programming model for solving "embarassingly parallel" problems with R. For more information about programming SNOW, see
SNOW Simplified or Luke Tierney's site.

First save the following R script to your account.

library(Rmpi)
library(snow)

# Initialize SNOW using MPI communication. The first line will get the
# number of MPI processes the scheduler assigned to us. Everything else 
# is standard SNOW

np <- mpi.universe.size()
cluster <- makeMPIcluster(np)

# Print the hostname for each cluster member
sayhello <- function()
{
    info <- Sys.info()[c("nodename", "machine")]
    paste("Hello from", info[1], "with CPU type", info[2])
}

names <- clusterCall(cluster, sayhello)
print(unlist(names))

# Compute row sums in parallel using all processes,
# then a grand sum at the end on the master process
parallelSum <- function(m, n)
{
    A <- matrix(rnorm(m*n), nrow = m, ncol = n)
    row.sums <- parApply(cluster, A, 1, sum)
    print(sum(row.sums))
}

parallelSum(500, 500)

stopCluster(cluster)
mpi.exit()


Download: ../code-2010/R-snow-mpi-test/snow-test.R
Notice that we first load the SNOW library, then we find our number of assigned MPI processes from an MPI call (using the Rmpi interface, introduced briefly in the next section). We set up a SNOW cluster from our assigned nodes, print out some information from the cluster members, and perform a few simple parallel calculations before stopping the SNOW cluster and exiting.

In order to run our code on the cluster, we'll need a batch script as usual

#!/bin/bash
#SBATCH --job-name=snow-mpi-test
#SBATCH --output=slurm.out
#SBATCH --error=slurm.err
#SBATCH --partition=develop
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=3

mpirun -np 1 R --no-save < snow-test.R

Download: ../code-2010/R-snow-mpi-test/run.slurm
Notice that we're requesting six processes in the develop queue, and launching R as a batch process. There is one notable difference from usual SLURM scripts here - we have only requested mpirun to launch a single process. That process will be responsible for asking MPI to start the remaining processes that we had requested. Now let's submit this script to the batch system.
[araim1@tara-fe1 snow-mpi-test]$ sbatch run.slurm
sbatch: Submitted batch job 2648
[araim1@tara-fe1 snow-mpi-test]$
Once the job completes, we should see something like the following in our slurm.out output (and slurm.err should be empty):
[araim1@tara-fe1 SNOW]$ cat slurm.out 

[... R disclaimer message ...]

> library(Rmpi)
> library(snow)
> 
> # Initialize SNOW using MPI communication. The first line will get the
> # number of MPI processes the scheduler assigned to us. Everything else 
> # is standard SNOW
> 
> np <- mpi.universe.size()
> cluster <- makeMPIcluster(np)
    8 slaves are spawned successfully. 0 failed.
> 
> # Print the hostname for each cluster member
> sayhello <- function()
+ {
+     info <- Sys.info()[c("nodename", "machine")]
+     paste("Hello from", info[1], "with CPU type", info[2])
+ }
> 
> names <- clusterCall(cluster, sayhello)
> print(unlist(names))
[1] "Hello from n1 with CPU type x86_64" "Hello from n1 with CPU type x86_64"
[3] "Hello from n1 with CPU type x86_64" "Hello from n2 with CPU type x86_64"
[5] "Hello from n2 with CPU type x86_64" "Hello from n2 with CPU type x86_64"
[7] "Hello from n2 with CPU type x86_64" "Hello from n1 with CPU type x86_64"
> 
> # Compute row sums in parallel using all processes,
> # then a grand sum at the end on the master process
> parallelSum <- function(m, n)
+ {
+     A <- matrix(rnorm(m*n), nrow = m, ncol = n)
+     row.sums <- parApply(cluster, A, 1, sum)
+     print(sum(row.sums))
+ }
> 
> parallelSum(500, 500)
[1] 13.40937
> 
> stopCluster(cluster)
[1] 1
> mpi.exit()
[1] "Detaching Rmpi. Rmpi cannot be used unless relaunching R."
>

An Rmpi example

Rmpi is another popular R package for parallel computing. It's a bit more involved to use than SNOW, but also offers more control. Note that Rmpi follows a slightly different paradigm than the usual MPI. In Rmpi there is a master process that spawns slaves to work in parallel, in contrast to the usual "single program multiple data" model. Rmpi features many familiar communications like "bcast" and "send" however. For more information about using Rmpi, a good place to check is the reference manual on CRAN. Also see this tutorial.

Hello example

To see Rmpi in action, we will run the following parallel "hello world" R script

library(Rmpi)
mpi.spawn.Rslaves(needlog = FALSE)

mpi.bcast.cmd( id <- mpi.comm.rank() )
mpi.bcast.cmd( np <- mpi.comm.size() )
mpi.bcast.cmd( host <- mpi.get.processor.name() )
result <- mpi.remote.exec(paste("I am", id, "of", np, "running on", host)) 

print(unlist(result))

mpi.close.Rslaves(dellog = FALSE)
mpi.exit()


Download: ../code-2010/Rmpi-test/hello.R
Note that if the option the option "dellog = FALSE" is not specified, a warning will be returned from OpenMPI when the Rmpi tries to clean up:
An MPI process has executed an operation involving a call to the "fork()" system call to create a child process. Open MPI is currently operating in a condition that could result in memory corruption or other system errors...
The option "needlog = FALSE" tells Rmpi not to create the extra log files associated with this cleanup. If you would like to see the log files along with your results, do not specify this option.
To run this code, we will use the following SLURM script, which launches only the master process initially, as in the SNOW example from earlier. When the master calls "mpi.spawn.Rslaves", the remaining MPI processes will be initialized.
#!/bin/bash
#SBATCH --job-name=Rmpi_hello
#SBATCH --output=slurm.out
#SBATCH --error=slurm.err
#SBATCH --partition=develop
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=4

mpirun -np 1 R --no-save < hello.R 

Download: ../code-2010/Rmpi-test/run.slurm
After submitting the job it should run to completion, producing an empty slurm.err file and the following slurm.out
[araim1@tara-fe1 Rmpi-test]$ cat slurm.out 

[... R disclaimer message ...]

> library(Rmpi)
> mpi.spawn.Rslaves(needlog = FALSE)
    8 slaves are spawned successfully. 0 failed.
master (rank 0, comm 1) of size 9 is running on: n1 
slave1 (rank 1, comm 1) of size 9 is running on: n1 
slave2 (rank 2, comm 1) of size 9 is running on: n1 
slave3 (rank 3, comm 1) of size 9 is running on: n1 
slave4 (rank 4, comm 1) of size 9 is running on: n2 
slave5 (rank 5, comm 1) of size 9 is running on: n2 
slave6 (rank 6, comm 1) of size 9 is running on: n2 
slave7 (rank 7, comm 1) of size 9 is running on: n2 
slave8 (rank 8, comm 1) of size 9 is running on: n1 
> 
> mpi.bcast.cmd( id <- mpi.comm.rank() )
> mpi.bcast.cmd( np <- mpi.comm.size() )
> mpi.bcast.cmd( host <- mpi.get.processor.name() )
> result <- mpi.remote.exec(paste("I am", id, "of", np, "running on", host)) 
> 
> print(unlist(result))
                     slave1                      slave2 
"I am 1 of 9 running on n1" "I am 2 of 9 running on n1" 
                     slave3                      slave4 
"I am 3 of 9 running on n1" "I am 4 of 9 running on n2" 
                     slave5                      slave6 
"I am 5 of 9 running on n2" "I am 6 of 9 running on n2" 
                     slave7                      slave8 
"I am 7 of 9 running on n2" "I am 8 of 9 running on n1" 
> 
> mpi.close.Rslaves(dellog = FALSE)
[1] 1
> mpi.exit()
[1] "Detaching Rmpi. Rmpi cannot be used unless relaunching R."
>

Gather & reduce example

Let's look at a slightly more interesting example of Rmpi, using the "gather" and "reduce" communications. Notice especially that the master process must be involved in the communication, which is invoked separately from the slaves. (If you forget to include the master, the program will hang until it is killed). Here is the R code
library(Rmpi)

mpi.spawn.Rslaves(needlog = FALSE)

mpi.bcast.cmd( id <- mpi.comm.rank() )
mpi.bcast.cmd( np <- mpi.comm.size() )
mpi.bcast.cmd( host <- mpi.get.processor.name() )
result <- mpi.remote.exec(paste("I am", id, "of", np, "running on", host)) 
print(unlist(result))

# Sample one normal observation on the master and each slave
x <- rnorm(1)
mpi.bcast.cmd(x <- rnorm(1))

# Gather the entire x vector (by default to process 0, the master)
mpi.bcast.cmd(mpi.gather.Robj(x))
y <- mpi.gather.Robj(x)
print(unlist(y))

# Sum the x vector together, storing the result on process 0 by default
mpi.bcast.cmd(mpi.reduce(x, op = "sum"))
z <- mpi.reduce(x, op = "sum")
print(z)

mpi.close.Rslaves(dellog = FALSE)
mpi.exit()

Download: ../code-2010/Rmpi-comm-test/driver.R
The submission script is similar to the previous example
#!/bin/bash
#SBATCH --job-name=Rmpi_test
#SBATCH --output=slurm.out
#SBATCH --error=slurm.err
#SBATCH --partition=develop
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=8

mpirun -np 1 R --no-save < driver.R

Download: ../code-2010/Rmpi-comm-test/run.slurm
And here are the results
[araim1@tara-fe1 Rmpi-comm-test]$ cat slurm.out 

[... R disclaimer message ...]

> library(Rmpi)
> 
> mpi.spawn.Rslaves(needlog = FALSE)
    8 slaves are spawned successfully. 0 failed.
master (rank 0, comm 1) of size 9 is running on: n1 
slave1 (rank 1, comm 1) of size 9 is running on: n1 
... ... ...
slave8 (rank 8, comm 1) of size 9 is running on: n1 
> 
> mpi.bcast.cmd( id <- mpi.comm.rank() )
> mpi.bcast.cmd( np <- mpi.comm.size() )
> mpi.bcast.cmd( host <- mpi.get.processor.name() )
> result <- mpi.remote.exec(paste("I am", id, "of", np, "running on", host)) 
> print(unlist(result))
                     slave1                      slave2 
"I am 1 of 9 running on n1" "I am 2 of 9 running on n1" 
                     slave3                      slave4 
"I am 3 of 9 running on n1" "I am 4 of 9 running on n1" 
                     slave5                      slave6 
"I am 5 of 9 running on n1" "I am 6 of 9 running on n1" 
                     slave7                      slave8 
"I am 7 of 9 running on n1" "I am 8 of 9 running on n1" 
> 
> # Sample one normal observation on the master and each slave
> x <- rnorm(1)
> mpi.bcast.cmd(x <- rnorm(1))
> 
> # Gather the entire x vector (by default to process 0, the master)
> mpi.bcast.cmd(mpi.gather.Robj(x))
> y <- mpi.gather.Robj(x)
> print(unlist(y))
[1] -2.6498050  0.5241441 -0.6747354  0.5915066  0.7660781  0.3608518 -2.7048508
[8] -0.4686277  0.5241441
> 
> # Sum the x vector together, storing the result on process 0 by default
> mpi.bcast.cmd(mpi.reduce(x, op = "sum"))
> z <- mpi.reduce(x, op = "sum")
> print(z)
[1] -3.731294
> 
> mpi.close.Rslaves(dellog = FALSE)
[1] 1
> mpi.exit()
[1] "Detaching Rmpi. Rmpi cannot be used unless relaunching R."
>