Setup:
Install r-essentials on all cluster nodes via acluster conda install r-essentials -c r
Install IRkernel on head node (https://github.com/IRkernel/IRkernel)
Add symbolic link from /usr/bin/Rscript to /opt/anaconda/bin/Rscript on all nodes
In [1]:
Sys.setenv(SPARK_HOME='/opt/anaconda/share/spark')
.libPaths(c(file.path(Sys.getenv('SPARK_HOME'), 'R', 'lib'), .libPaths()))
In [2]:
library(SparkR)
In [3]:
sc <- sparkR.init("spark://ip-172-31-9-200:7077")
In [4]:
sqlContext <- sparkRSQL.init(sc)
In [5]:
df <- createDataFrame(sqlContext, faithful)
In [6]:
df
Out[6]:
In [7]:
head(select(df, df$eruptions))
Out[7]:
In [8]:
head(filter(df, df$waiting < 50))
Out[8]:
In [9]:
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
Out[9]:
In [10]:
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
Out[10]:
In [11]:
df$waiting_secs <- df$waiting * 60
head(df)
Out[11]:
In [12]:
people <- read.df(sqlContext, "mock_data.json", "json")
In [13]:
count(people)
Out[13]:
In [14]:
head(people)
Out[14]:
In [15]:
printSchema(people)
In [16]:
registerTempTable(people, "people")
In [17]:
males <- sql(sqlContext, "SELECT first_name FROM people WHERE gender = 'Male'")
head(males)
Out[17]:
In [18]:
females <- sql(sqlContext, "SELECT first_name FROM people WHERE gender = 'Female'")
head(females)
Out[18]:
In [19]:
df <- createDataFrame(sqlContext, iris)
In [20]:
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")
In [21]:
summary(model)
Out[21]:
In [22]:
predictions <- predict(model, newData = df)
head(select(predictions, "Sepal_Length", "prediction"))
Out[22]: