Establecemos las variables de ambiente
In [ ]:
Sys.setenv(SPARK_HOME='/opt/spark')
.libPaths(c(file.path(Sys.getenv('SPARK_HOME'), 'R', 'lib'), .libPaths()))
Cargamos la librería de SparkR
In [ ]:
library(SparkR)
Creamos el contexto de Spark, y agregando la biblioteca de spark-csv
In [ ]:
sc <- sparkR.init(sparkPackages="com.databricks:spark-csv_2.11:1.2.0")
Por último, creamos el contexto de SparkSQL
In [ ]:
sqlContext <- sparkRSQL.init(sc)
Usando el ejemplo clásico del dataset iris
In [ ]:
iris.rdd <- createDataFrame(sqlContext, iris)
In [ ]:
head(iris.rdd)
También se pueden crear los dataframes desde archivo
In [ ]:
tsum_a <- read.df(sqlContext, "data/data_tsumoto/TSUM_A.CSV", "com.databricks.spark.csv", header="true")
In [ ]:
head(tsum_a)
In [ ]:
printSchema(tsum_a)
In [ ]:
head(select(tsum_a, tsum_a$SEX))
In [ ]:
head(select(tsum_a, 'Admission'))
In [ ]:
head(filter(tsum_a, tsum_a$SEX == 'M'))
In [ ]:
head(summarize(groupBy(tsum_a, tsum_a$SEX), count = n(tsum_a$SEX)))
In [ ]:
tsum_a_summary <- describe(tsum_a)
In [ ]:
tsum_a_summary
In [ ]:
nrow(tsum_a)
In [ ]:
?collect # Collect convierte el objeto Spark DataFrame a un data.frame de R
Lo cual permitirá utilizar funciones de R clásicas como ggplot
In [ ]:
registerTempTable(tsum_a, "tsum_a")
In [ ]:
vacios <- collect(sql(sqlContext, "select * from tsum_a where SEX = ''")) # Esto ya es un data.frame de R
In [ ]:
vacios
In [ ]:
head(vacios, n = 20)
In [ ]:
library(ggplot2)
In [ ]:
ggplot(data=vacios, aes(x=factor(Diagnosis))) +
geom_bar() +
theme(axis.text.x=element_text(angle = -45, hjust = 0))
SparkR al igual que pyspark, puede leer fácilmente archivos JSON
In [ ]:
projects <- read.df(sqlContext = sqlContext, path = "data/world_bank/world_bank.json", source = "json")
In [ ]:
projects # Es un SparkR DataFrame, no un data.frame de R
In [ ]:
printSchema(projects)
In [ ]:
registerTempTable(projects, 'wb_projects')
In [ ]:
head(sql(sqlContext, 'select countryname, source from wb_projects order by source, countryname limit 5'))
# Esto es un Spark DataFrame
In [ ]:
tsum_a
In [ ]:
collect(select(tsum_a, "SEX", "Diagnosis")) # Seleccionar un conjunto de columnas
El filtrado se realiza normalmente (nota las diferentes maneras de hacer referencia a las columnas)
In [ ]:
head(filter(tsum_a, tsum_a$Diagnosis == "SLE"))
In [ ]:
head(summarize(groupBy(tsum_a, tsum_a$Admission), count=n(tsum_a$Admission)))
Probablemente sean ruido esos caracteres raros, ordenemos
In [ ]:
conteo_admisiones <- summarize(groupBy(tsum_a, tsum_a$Admission), count=n(tsum_a$Admission))
head(arrange(conteo_admisiones, desc(conteo_admisiones$count)), n = 10)