Preparación

Establecemos las variables de ambiente


In [ ]:
Sys.setenv(SPARK_HOME='/opt/spark')
.libPaths(c(file.path(Sys.getenv('SPARK_HOME'), 'R', 'lib'), .libPaths()))

Cargamos la librería de SparkR


In [ ]:
library(SparkR)

Creamos el contexto de Spark, y agregando la biblioteca de spark-csv


In [ ]:
sc <- sparkR.init(sparkPackages="com.databricks:spark-csv_2.11:1.2.0")

Por último, creamos el contexto de SparkSQL


In [ ]:
sqlContext <- sparkRSQL.init(sc)

La documentación está aquí y aquí

Dataframes

Usando el ejemplo clásico del dataset iris


In [ ]:
iris.rdd <- createDataFrame(sqlContext, iris)

In [ ]:
head(iris.rdd)

También se pueden crear los dataframes desde archivo


In [ ]:
tsum_a <- read.df(sqlContext, "data/data_tsumoto/TSUM_A.CSV", "com.databricks.spark.csv", header="true")

In [ ]:
head(tsum_a)

In [ ]:
printSchema(tsum_a)

In [ ]:
head(select(tsum_a, tsum_a$SEX))

In [ ]:
head(select(tsum_a, 'Admission'))

In [ ]:
head(filter(tsum_a, tsum_a$SEX == 'M'))

In [ ]:
head(summarize(groupBy(tsum_a, tsum_a$SEX), count = n(tsum_a$SEX)))

In [ ]:
tsum_a_summary <- describe(tsum_a)

In [ ]:
tsum_a_summary

In [ ]:
nrow(tsum_a)

In [ ]:
?collect # Collect convierte el objeto Spark DataFrame a un data.frame de R

Lo cual permitirá utilizar funciones de R clásicas como ggplot


In [ ]:
registerTempTable(tsum_a, "tsum_a")

In [ ]:
vacios <- collect(sql(sqlContext, "select * from tsum_a where SEX = ''")) # Esto ya es un data.frame de R

In [ ]:
vacios

In [ ]:
head(vacios, n = 20)

In [ ]:
library(ggplot2)

In [ ]:
ggplot(data=vacios, aes(x=factor(Diagnosis))) + 
    geom_bar() + 
    theme(axis.text.x=element_text(angle = -45, hjust = 0))

SparkR al igual que pyspark, puede leer fácilmente archivos JSON


In [ ]:
projects <- read.df(sqlContext = sqlContext, path = "data/world_bank/world_bank.json", source = "json")

In [ ]:
projects # Es un SparkR DataFrame, no un data.frame de R

In [ ]:
printSchema(projects)

In [ ]:
registerTempTable(projects, 'wb_projects')

In [ ]:
head(sql(sqlContext, 'select countryname, source from wb_projects order by source, countryname limit 5')) 
# Esto es un Spark DataFrame

Manipulación de datos


In [ ]:
tsum_a

In [ ]:
collect(select(tsum_a, "SEX", "Diagnosis")) # Seleccionar un conjunto de columnas

El filtrado se realiza normalmente (nota las diferentes maneras de hacer referencia a las columnas)


In [ ]:
head(filter(tsum_a, tsum_a$Diagnosis == "SLE"))

In [ ]:
head(summarize(groupBy(tsum_a, tsum_a$Admission), count=n(tsum_a$Admission)))

Probablemente sean ruido esos caracteres raros, ordenemos


In [ ]:
conteo_admisiones <- summarize(groupBy(tsum_a, tsum_a$Admission), count=n(tsum_a$Admission))
head(arrange(conteo_admisiones, desc(conteo_admisiones$count)), n = 10)