Using SparkR with Anaconda

Setup:

  1. Install r-essentials on all cluster nodes via acluster conda install r-essentials -c r

  2. Install IRkernel on head node (https://github.com/IRkernel/IRkernel)

  3. Add symbolic link from /usr/bin/Rscript to /opt/anaconda/bin/Rscript on all nodes

Set path and import SparkR library


In [1]:
Sys.setenv(SPARK_HOME='/opt/anaconda/share/spark')
.libPaths(c(file.path(Sys.getenv('SPARK_HOME'), 'R', 'lib'), .libPaths()))

In [2]:
library(SparkR)


Attaching package: 'SparkR'

The following objects are masked from 'package:stats':

    filter, na.omit

The following objects are masked from 'package:base':

    intersect, rbind, sample, subset, summary, table, transform

Initialize the SparkContext


In [3]:
sc <- sparkR.init("spark://ip-172-31-9-200:7077")


Launching java with spark-submit command /opt/anaconda/share/spark/bin/spark-submit   sparkr-shell /tmp/Rtmp7CVgW6/backend_port226960341be9 

In [4]:
sqlContext <- sparkRSQL.init(sc)

Create a SparkR DataFrame using the faithful dataset from R


In [5]:
df <- createDataFrame(sqlContext, faithful)

In [6]:
df


Out[6]:
DataFrame[eruptions:double, waiting:double]

In [7]:
head(select(df, df$eruptions))


Out[7]:
eruptions
13.6
21.8
33.333
42.283
54.533
62.883

In [8]:
head(filter(df, df$waiting < 50))


Out[8]:
eruptionswaiting
11.7547
21.7547
31.86748
41.7548
52.16748
62.149

In [9]:
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))


Out[9]:
waitingcount
18113
2606
3932
4681
5474
6808

In [10]:
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))


Out[10]:
waitingcount
17815
28314
38113
47712
58212
67910

In [11]:
df$waiting_secs <- df$waiting * 60
head(df)


Out[11]:
eruptionswaitingwaiting_secs
13.6794740
21.8543240
33.333744440
42.283623720
54.533855100
62.883553300

Load JSON data from HDFS


In [12]:
people <- read.df(sqlContext, "mock_data.json", "json")

In [13]:
count(people)


Out[13]:
1000

In [14]:
head(people)


Out[14]:
emailfirst_namegenderidip_addresslast_name
1areyes0@home.plArthurMale1163.138.188.169Reyes
2smorales1@scientificamerican.comSamuelMale2237.78.10.190Morales
3jfisher2@samsung.comJuliaFemale32.149.116.117Fisher
4kreid3@prlog.orgKathyFemale414.165.16.179Reid
5jjackson4@addthis.comJeremyMale570.140.102.156Jackson
6emills5@google.co.ukErnestMale6182.51.21.159Mills

In [15]:
printSchema(people)


root
 |-- email: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- id: long (nullable = true)
 |-- ip_address: string (nullable = true)
 |-- last_name: string (nullable = true)

Running SQL queries from SparkR


In [16]:
registerTempTable(people, "people")

In [17]:
males <- sql(sqlContext, "SELECT first_name FROM people WHERE gender = 'Male'")
head(males)


Out[17]:
first_name
1Arthur
2Samuel
3Jeremy
4Ernest
5Louis
6William

In [18]:
females <- sql(sqlContext, "SELECT first_name FROM people WHERE gender = 'Female'")
head(females)


Out[18]:
first_name
1Julia
2Kathy
3Ann
4Debra
5Rose
6Kathleen

Machine learning


In [19]:
df <- createDataFrame(sqlContext, iris)


Warning message:
In FUN(X[[i]], ...): Use Sepal_Length instead of Sepal.Length  as column nameWarning message:
In FUN(X[[i]], ...): Use Sepal_Width instead of Sepal.Width  as column nameWarning message:
In FUN(X[[i]], ...): Use Petal_Length instead of Petal.Length  as column nameWarning message:
In FUN(X[[i]], ...): Use Petal_Width instead of Petal.Width  as column name

In [20]:
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")

In [21]:
summary(model)


Out[21]:
$coefficients =
Estimate
(Intercept)2.251393
Sepal_Width0.8035609
Species__versicolor1.458743
Species__virginica1.946817

In [22]:
predictions <- predict(model, newData = df)
head(select(predictions, "Sepal_Length", "prediction"))


Out[22]:
Sepal_Lengthprediction
15.15.063856
24.94.662076
34.74.822788
44.64.742432
555.144212
65.45.385281