In [ ]:
big.data.path <- Sys.getenv("ACADEMYR_BIG_DATA_PATH")
data.path <- "../data"
output.path <- "./output/xdf"
if(!file.exists(output.path)) dir.create(output.path, recursive = TRUE)
sample.data.dir <- rxGetOption("sampleDataDir")

Let's compare the performance of HPA functions with their open-source R couterparts:

  • rxLinMod vs lm
  • rxLogit vs glm
  • rxGlm vs glm
  • rxSummary vs dplyr

We will read the airline.csv data set and load it into R as a data.frame (since open-source R functions do not work on XDF files, we won't be using XDF for these tests).


In [ ]:
airline.csv <- file.path(sample.data.dir, "AirlineDemoSmall.csv")

colInfo <- list(
  DayOfWeek = list(
    type = "factor",
    levels = c("Monday", "Tuesday", "Wednesday",
               "Thursday", "Friday", "Saturday", "Sunday"))
)

airline.df <- rxImport(inData = airline.csv, # no outFile means we get a data.frame
                       colInfo = colInfo,
                       missingValueString = "M")

dim(airline.df)

rxLinMod vs lm


In [ ]:
model <- lm(ArrDelay ~ DayOfWeek, data = airline.df)
summary(model)

model <- rxLinMod(ArrDelay ~ DayOfWeek, data = airline.df, dropFirst = TRUE)
summary(model)

In [ ]:
library(microbenchmark)
print(microbenchmark(
  lm(ArrDelay ~ DayOfWeek, data = airline.df),
  rxLinMod(ArrDelay ~ DayOfWeek, data = airline.df, dropFirst = TRUE, reportProgress = 0),
  times = 30
))

rxLogit vs glm


In [ ]:
parallel <- function() rxLogit(ArrDelay > 10 ~ DayOfWeek, data = airline.df, dropFirst = TRUE, reportProgress = 0)
sequentl <- function() glm(ArrDelay > 10 ~ DayOfWeek, data = airline.df, family = binomial(link = 'logit'))

print(microbenchmark(
  parallel(),
  sequentl(),
  times = 10
))

rxSummary vs dplyr


In [ ]:
revo_sum <- function() rxSummary(ArrDelay ~ F(DayOfWeek), data = airline.df, reportProgress = 0)

library(dplyr)

dplyrsum <- function() {
  airline.df %>%
    group_by(DayOfWeek) %>%
    summarise(
      Means = mean(ArrDelay, na.rm = TRUE),
      StdDev = sd(ArrDelay, na.rm = TRUE),
      Min = min(ArrDelay, na.rm = TRUE),
      Max = max(ArrDelay, na.rm = TRUE),
      ValidObs = sum(!is.na(ArrDelay))
    )
}

print(microbenchmark(
  revo_sum(),
  dplyrsum(),
  times = 100
))

rxGlm vs glm on small data set: glm wins


In [ ]:
library(robust)
data(breslow.dat, package = "robust")
dim(breslow.dat)

parallel <- function() rxGlm(sumY ~ Base + Age + Trt, dropFirst = TRUE, data = breslow.dat, family = poisson(), reportProgress = 0)
sequentl <- function() glm(sumY ~ Base + Age + Trt, data = breslow.dat, family = poisson())

# smaller data set means parallel algorithm is not necessarily faster
print(microbenchmark(
  parallel(),
  sequentl(),
  times = 10
))

rxGlm vs glm on large data set: rxGlm wins


In [ ]:
# we blow up the size of the data 3000 fold
breslow.big <- do.call(rbind, lapply(1:3000, function(i) breslow.dat))
dim(breslow.big)

parallel <- function() rxGlm(sumY ~ Base + Age + Trt, dropFirst = TRUE, data = breslow.big, family = poisson(), reportProgress = 0)
sequentl <- function() glm(sumY ~ Base + Age + Trt, data = breslow.big, family = poisson())

# smaller data set means parallel algorithm is not necessarily faster
print(microbenchmark(
  parallel(),
  sequentl(),
  times = 10
))

rm(breslow.big)

comparing RxLocalParallel with RxLocalSeq


In [ ]:
rxSetComputeContext(RxLocalParallel())
rxOptions(numCoresToUse = 12)

rxExec(sqrt, rxElemArg(1:4))
# rxElemArg allows you to pass different arguments to each worker
rxExec(sqrt, 1:4, timesToRun = 4)
# timesToRun calculates the square roots of the entire sequence 1:4 four times

In [ ]:
nsize <- 10^5
system.time(rnorm(nsize))

rxSetComputeContext(RxLocalSeq())
system.time(rxExec(function(i) rnorm(nsize), rxElemArg(1:4)))

rxSetComputeContext(RxLocalParallel())
system.time(rxExec(function(i) rnorm(nsize), rxElemArg(1:4), execObjects = "nsize"))

In [ ]:
compare.runtimes <- function(nsize, nproc) {
  cat(sprintf("size = %s \n", formatC(nsize, format = "d", big.mark = ",")))
  st1 <- system.time(rnorm(nsize))

  rxSetComputeContext(RxLocalSeq())
  st2 <- system.time(rxExec(function(i) rnorm(nsize), rxElemArg(1:nproc)))

  rxSetComputeContext(RxLocalParallel())
  st3 <- system.time(rxExec(function(i) rnorm(nsize), rxElemArg(1:nproc), execObjects = c("nsize", "nproc")))

  dd <- do.call(rbind, list(st1, st2, st3))
  dd <- as.data.frame(dd)
  dd$test <- c('single test', 'four sequential', 'four parallel')
  dd$factor <- dd$elapsed / lag(dd$elapsed)
  dd[ , c('test', 'elapsed', 'factor')]
}

compare.runtimes(10^4, 16)
compare.runtimes(10^5, 16)
compare.runtimes(10^6, 16)
compare.runtimes(10^7, 16)

The mandelbrot example:

A complex number is in the mandelbrot set if the following loop converges: $z_{n+1} = z^2_n + c$ where $z_0 = 0$.


In [ ]:
mandelbrot <- function(x0, y0, lim) {
  x <- x0; y <- y0
  iter <- 0
  while (x^2 + y^2 < 4 && iter < lim) {
    xtemp <- x^2 - y^2 + x0
    y <- 2 * x * y + y0
    x <- xtemp
    iter <- iter + 1
  }
  iter
}

mandelbrot(0, 0, 50)
mandelbrot(2, 5, 50)

In [ ]:
vmandelbrot <- function(xvec, y0, lim) {
  sapply(xvec, mandelbrot, y0 = y0, lim = lim)
}

vmandelbrot(0:10, 0, 50)

In [ ]:
size <- 150
x.in <- seq(-2.0, 0.6, length.out = size)
y.in <- seq(-1.3, 1.3, length.out = size)
m <- 100
z <- rxExec(vmandelbrot, x.in, y0 = rxElemArg(y.in), m, execObjects = "mandelbrot")
z <- matrix(unlist(z), ncol = size) # order the data for the image

image(x.in, y.in, z, col = c(rainbow(m), '#000000'))

foreach vs RevoScaleR


In [ ]:
library(foreach)
foreach(i = 4:6) %do% sqrt(i) # sequentially
foreach(i = 4:6) %dopar% sqrt(i) # parallel

rxSetComputeContext(RxLocalSeq())
rxExec(sqrt, elemArgs = 4:6) # sequentially

rxSetComputeContext(RxLocalParallel())
rxExec(sqrt, elemArgs = 4:6) # in parallel

a parallel backend for kmeans


In [ ]:
# Create artificial data
set.seed(1)
X <- rbind(
  matrix(rnorm(100, mean = 0, sd = 0.3), ncol = 2),
  matrix(rnorm(100, mean = 1, sd = 0.3), ncol = 2)
)
colnames(X) <- c("x", "y")
plot(X)

In [ ]:
kmeans(X, 5)

In [ ]:
clusterPlot <- function(x, n = 4, nstart = 1){
  cl <- kmeans(x, n, nstart = nstart)
  plot(x, col = cl$cluster)
  points(cl$centers, col = 1:n, pch = 8, cex = 2)
}

par(mfrow=c(1, 2))
clusterPlot(X)
clusterPlot(X)
par(mfrow=c(1, 1))

In [ ]:
clusterPlot(X, n = 5, nstart = 25)

In [ ]:
rxSetComputeContext(RxLocalParallel())

numTimes <- 8
results <- rxExec(kmeans, X, centers = 5, iter.max = 35, nstart = 50, timesToRun = numTimes, elemType = "cores")

(sumSSW <- vapply(results, function(x) sum(x$withinss), FUN.VALUE = numeric(1)))
results[[which.min(sumSSW)]]

In [ ]:
kMeansRSR <- function(x, centers = 5, iter.max = 10, nstart = 1, numTimes = 20) {
  results <- rxExec(FUN = kmeans, x = x, centers = centers, iter.max = iter.max,
                    nstart = nstart, elemType = "cores", timesToRun = numTimes)
  sumSSW <- vapply(results, function(x) sum(x$withinss), FUN.VALUE = numeric(1))
  results[[which.min(sumSSW)]]
}

# create 5000 x 50 matrix
nrow <- 5000
ncol <- 50
Z <- matrix(rnorm(nrow*ncol), nrow, ncol)
iter.max <- 50
workers <- 8

nstart <- 32
(km1st <- system.time(km1 <- kmeans(Z, 10, iter.max, nstart)))
(km8st <- system.time(kmrsr <- kMeansRSR(Z, 10, iter.max, nstart = nstart/(2*workers), numTimes = 2*workers)))

cross-validation with a parallel backend


In [ ]:
airline.xdf <- file.path(output.path, "airline.xdf")

colClasses <- c(ArrDelay = "numeric", CRSDepTime = "numeric", DayOfWeek = "factor")
rxImport(airline.csv, airline.xdf, colClasses = colClasses, overwrite = TRUE, reportProgress = 0)

airline.scored.xdf <- file.path(output.path, "airline_scored.xdf")

k <- 10
rxDataStep(inData = airline.xdf,
           outFile = airline.xdf,
           transforms = list(
             kSplits = factor(sample(LETTERS[1:k], size = .rxNumRows, replace = TRUE))),
           transformObjects = list(LETTERS = LETTERS, k = k),
           append = "rows",
           overwrite = FALSE, reportProgress = 0)

# split the data by each fold
kSplits <- rxSplit(inData = airline.xdf,
                   outFilesBase = file.path(output.path, "airline"),
                   splitByFactor = "kSplits", overwrite = TRUE, reportProgress = 0)

# for each fold:
# run `rxLinMod` on the other k-1 folds
# predict on the k-th fold based on the model developed above

myLinModWrapper <- function(
  holdoutlevel, # letters A-Z, one for each fold
  splitFiles, # a list of file names, one for each data split
  outFile = NULL # an optional data path, to combine all the splits into a single file
) {

  # first, estimate the model on all data point but those including holdoutlevel
  myMod <- rxLinMod(ArrDelay ~ DayOfWeek + F(CRSDepTime), data = airline.xdf,
                    rowSelection = kSplits != holdout,
                    transformObjects = list(holdout = holdoutlevel),
                    reportProgress = 0)

  # then, generate predictions
  curHoldOut <- grep(paste("kSplits", holdoutlevel, "xdf", sep = "."), names(splitFiles), value = TRUE)
  rxPredict(myMod, data = splitFiles[[curHoldOut]], overwrite = TRUE, predVarNames = "ArrDelay_kFold_Pred", reportProgress = 0)

 return(holdoutlevel)
}

# we can run this sequentially using `lapply`
system.time(lapply(LETTERS[1:k], myLinModWrapper, splitFiles = kSplits))

# we can run it sequentially using `rxExec`
rxSetComputeContext(RxLocalSeq())
system.time(rxExec(myLinModWrapper, splitFiles = kSplits, elemArgs = as.list(LETTERS[1:k])))

# we can run it in parallel using `rxExec`
rxSetComputeContext(RxLocalParallel())
system.time(rxExec(myLinModWrapper, splitFiles = kSplits, elemArgs = as.list(LETTERS[1:k]), execObjects = c('airline.xdf', 'airline.scored.xdf', 'output.path'), elemType = "cores"))