In [ ]:
big.data.path <- Sys.getenv("ACADEMYR_BIG_DATA_PATH")
data.path <- "../data"
output.path <- "./output/xdf"
if(!file.exists(output.path)) dir.create(output.path, recursive = TRUE)
sample.data.dir <- rxGetOption("sampleDataDir")
Let's compare the performance of HPA functions with their open-source R couterparts:
rxLinMod vs lmrxLogit vs glmrxGlm vs glmrxSummary vs dplyrWe will read the airline.csv data set and load it into R as a data.frame (since open-source R functions do not work on XDF files, we won't be using XDF for these tests).
In [ ]:
airline.csv <- file.path(sample.data.dir, "AirlineDemoSmall.csv")
colInfo <- list(
DayOfWeek = list(
type = "factor",
levels = c("Monday", "Tuesday", "Wednesday",
"Thursday", "Friday", "Saturday", "Sunday"))
)
airline.df <- rxImport(inData = airline.csv, # no outFile means we get a data.frame
colInfo = colInfo,
missingValueString = "M")
dim(airline.df)
In [ ]:
model <- lm(ArrDelay ~ DayOfWeek, data = airline.df)
summary(model)
model <- rxLinMod(ArrDelay ~ DayOfWeek, data = airline.df, dropFirst = TRUE)
summary(model)
In [ ]:
library(microbenchmark)
print(microbenchmark(
lm(ArrDelay ~ DayOfWeek, data = airline.df),
rxLinMod(ArrDelay ~ DayOfWeek, data = airline.df, dropFirst = TRUE, reportProgress = 0),
times = 30
))
In [ ]:
parallel <- function() rxLogit(ArrDelay > 10 ~ DayOfWeek, data = airline.df, dropFirst = TRUE, reportProgress = 0)
sequentl <- function() glm(ArrDelay > 10 ~ DayOfWeek, data = airline.df, family = binomial(link = 'logit'))
print(microbenchmark(
parallel(),
sequentl(),
times = 10
))
In [ ]:
revo_sum <- function() rxSummary(ArrDelay ~ F(DayOfWeek), data = airline.df, reportProgress = 0)
library(dplyr)
dplyrsum <- function() {
airline.df %>%
group_by(DayOfWeek) %>%
summarise(
Means = mean(ArrDelay, na.rm = TRUE),
StdDev = sd(ArrDelay, na.rm = TRUE),
Min = min(ArrDelay, na.rm = TRUE),
Max = max(ArrDelay, na.rm = TRUE),
ValidObs = sum(!is.na(ArrDelay))
)
}
print(microbenchmark(
revo_sum(),
dplyrsum(),
times = 100
))
In [ ]:
library(robust)
data(breslow.dat, package = "robust")
dim(breslow.dat)
parallel <- function() rxGlm(sumY ~ Base + Age + Trt, dropFirst = TRUE, data = breslow.dat, family = poisson(), reportProgress = 0)
sequentl <- function() glm(sumY ~ Base + Age + Trt, data = breslow.dat, family = poisson())
# smaller data set means parallel algorithm is not necessarily faster
print(microbenchmark(
parallel(),
sequentl(),
times = 10
))
In [ ]:
# we blow up the size of the data 3000 fold
breslow.big <- do.call(rbind, lapply(1:3000, function(i) breslow.dat))
dim(breslow.big)
parallel <- function() rxGlm(sumY ~ Base + Age + Trt, dropFirst = TRUE, data = breslow.big, family = poisson(), reportProgress = 0)
sequentl <- function() glm(sumY ~ Base + Age + Trt, data = breslow.big, family = poisson())
# smaller data set means parallel algorithm is not necessarily faster
print(microbenchmark(
parallel(),
sequentl(),
times = 10
))
rm(breslow.big)
In [ ]:
rxSetComputeContext(RxLocalParallel())
rxOptions(numCoresToUse = 12)
rxExec(sqrt, rxElemArg(1:4))
# rxElemArg allows you to pass different arguments to each worker
rxExec(sqrt, 1:4, timesToRun = 4)
# timesToRun calculates the square roots of the entire sequence 1:4 four times
In [ ]:
nsize <- 10^5
system.time(rnorm(nsize))
rxSetComputeContext(RxLocalSeq())
system.time(rxExec(function(i) rnorm(nsize), rxElemArg(1:4)))
rxSetComputeContext(RxLocalParallel())
system.time(rxExec(function(i) rnorm(nsize), rxElemArg(1:4), execObjects = "nsize"))
In [ ]:
compare.runtimes <- function(nsize, nproc) {
cat(sprintf("size = %s \n", formatC(nsize, format = "d", big.mark = ",")))
st1 <- system.time(rnorm(nsize))
rxSetComputeContext(RxLocalSeq())
st2 <- system.time(rxExec(function(i) rnorm(nsize), rxElemArg(1:nproc)))
rxSetComputeContext(RxLocalParallel())
st3 <- system.time(rxExec(function(i) rnorm(nsize), rxElemArg(1:nproc), execObjects = c("nsize", "nproc")))
dd <- do.call(rbind, list(st1, st2, st3))
dd <- as.data.frame(dd)
dd$test <- c('single test', 'four sequential', 'four parallel')
dd$factor <- dd$elapsed / lag(dd$elapsed)
dd[ , c('test', 'elapsed', 'factor')]
}
compare.runtimes(10^4, 16)
compare.runtimes(10^5, 16)
compare.runtimes(10^6, 16)
compare.runtimes(10^7, 16)
In [ ]:
mandelbrot <- function(x0, y0, lim) {
x <- x0; y <- y0
iter <- 0
while (x^2 + y^2 < 4 && iter < lim) {
xtemp <- x^2 - y^2 + x0
y <- 2 * x * y + y0
x <- xtemp
iter <- iter + 1
}
iter
}
mandelbrot(0, 0, 50)
mandelbrot(2, 5, 50)
In [ ]:
vmandelbrot <- function(xvec, y0, lim) {
sapply(xvec, mandelbrot, y0 = y0, lim = lim)
}
vmandelbrot(0:10, 0, 50)
In [ ]:
size <- 150
x.in <- seq(-2.0, 0.6, length.out = size)
y.in <- seq(-1.3, 1.3, length.out = size)
m <- 100
z <- rxExec(vmandelbrot, x.in, y0 = rxElemArg(y.in), m, execObjects = "mandelbrot")
z <- matrix(unlist(z), ncol = size) # order the data for the image
image(x.in, y.in, z, col = c(rainbow(m), '#000000'))
In [ ]:
library(foreach)
foreach(i = 4:6) %do% sqrt(i) # sequentially
foreach(i = 4:6) %dopar% sqrt(i) # parallel
rxSetComputeContext(RxLocalSeq())
rxExec(sqrt, elemArgs = 4:6) # sequentially
rxSetComputeContext(RxLocalParallel())
rxExec(sqrt, elemArgs = 4:6) # in parallel
In [ ]:
# Create artificial data
set.seed(1)
X <- rbind(
matrix(rnorm(100, mean = 0, sd = 0.3), ncol = 2),
matrix(rnorm(100, mean = 1, sd = 0.3), ncol = 2)
)
colnames(X) <- c("x", "y")
plot(X)
In [ ]:
kmeans(X, 5)
In [ ]:
clusterPlot <- function(x, n = 4, nstart = 1){
cl <- kmeans(x, n, nstart = nstart)
plot(x, col = cl$cluster)
points(cl$centers, col = 1:n, pch = 8, cex = 2)
}
par(mfrow=c(1, 2))
clusterPlot(X)
clusterPlot(X)
par(mfrow=c(1, 1))
In [ ]:
clusterPlot(X, n = 5, nstart = 25)
In [ ]:
rxSetComputeContext(RxLocalParallel())
numTimes <- 8
results <- rxExec(kmeans, X, centers = 5, iter.max = 35, nstart = 50, timesToRun = numTimes, elemType = "cores")
(sumSSW <- vapply(results, function(x) sum(x$withinss), FUN.VALUE = numeric(1)))
results[[which.min(sumSSW)]]
In [ ]:
kMeansRSR <- function(x, centers = 5, iter.max = 10, nstart = 1, numTimes = 20) {
results <- rxExec(FUN = kmeans, x = x, centers = centers, iter.max = iter.max,
nstart = nstart, elemType = "cores", timesToRun = numTimes)
sumSSW <- vapply(results, function(x) sum(x$withinss), FUN.VALUE = numeric(1))
results[[which.min(sumSSW)]]
}
# create 5000 x 50 matrix
nrow <- 5000
ncol <- 50
Z <- matrix(rnorm(nrow*ncol), nrow, ncol)
iter.max <- 50
workers <- 8
nstart <- 32
(km1st <- system.time(km1 <- kmeans(Z, 10, iter.max, nstart)))
(km8st <- system.time(kmrsr <- kMeansRSR(Z, 10, iter.max, nstart = nstart/(2*workers), numTimes = 2*workers)))
In [ ]:
airline.xdf <- file.path(output.path, "airline.xdf")
colClasses <- c(ArrDelay = "numeric", CRSDepTime = "numeric", DayOfWeek = "factor")
rxImport(airline.csv, airline.xdf, colClasses = colClasses, overwrite = TRUE, reportProgress = 0)
airline.scored.xdf <- file.path(output.path, "airline_scored.xdf")
k <- 10
rxDataStep(inData = airline.xdf,
outFile = airline.xdf,
transforms = list(
kSplits = factor(sample(LETTERS[1:k], size = .rxNumRows, replace = TRUE))),
transformObjects = list(LETTERS = LETTERS, k = k),
append = "rows",
overwrite = FALSE, reportProgress = 0)
# split the data by each fold
kSplits <- rxSplit(inData = airline.xdf,
outFilesBase = file.path(output.path, "airline"),
splitByFactor = "kSplits", overwrite = TRUE, reportProgress = 0)
# for each fold:
# run `rxLinMod` on the other k-1 folds
# predict on the k-th fold based on the model developed above
myLinModWrapper <- function(
holdoutlevel, # letters A-Z, one for each fold
splitFiles, # a list of file names, one for each data split
outFile = NULL # an optional data path, to combine all the splits into a single file
) {
# first, estimate the model on all data point but those including holdoutlevel
myMod <- rxLinMod(ArrDelay ~ DayOfWeek + F(CRSDepTime), data = airline.xdf,
rowSelection = kSplits != holdout,
transformObjects = list(holdout = holdoutlevel),
reportProgress = 0)
# then, generate predictions
curHoldOut <- grep(paste("kSplits", holdoutlevel, "xdf", sep = "."), names(splitFiles), value = TRUE)
rxPredict(myMod, data = splitFiles[[curHoldOut]], overwrite = TRUE, predVarNames = "ArrDelay_kFold_Pred", reportProgress = 0)
return(holdoutlevel)
}
# we can run this sequentially using `lapply`
system.time(lapply(LETTERS[1:k], myLinModWrapper, splitFiles = kSplits))
# we can run it sequentially using `rxExec`
rxSetComputeContext(RxLocalSeq())
system.time(rxExec(myLinModWrapper, splitFiles = kSplits, elemArgs = as.list(LETTERS[1:k])))
# we can run it in parallel using `rxExec`
rxSetComputeContext(RxLocalParallel())
system.time(rxExec(myLinModWrapper, splitFiles = kSplits, elemArgs = as.list(LETTERS[1:k]), execObjects = c('airline.xdf', 'airline.scored.xdf', 'output.path'), elemType = "cores"))