This is implemented using Apache Toree

When launching jupyter please add:

jupyter install --spark_opts="--packages com.databricks:spark-csv_2.10:1.3.0"

To your Toree setup


In [1]:
sc


Out[1]:
org.apache.spark.SparkContext@43c33e98

Now we start by downloading loading some data which is in csv format so its a good thing we got that csv package included already for us.

Note: the data is a modified version of https://archive.ics.uci.edu/ml/datasets/Adult


In [22]:
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("resources/adult.data")

In [23]:
df.printSchema()


root
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: string (nullable = true)
 |-- maritial-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: string (nullable = true)
 |-- capital-loss: string (nullable = true)
 |-- hours-per-week: string (nullable = true)
 |-- native-country: string (nullable = true)
 |-- category: string (nullable = true)

So as we can see Spark has simply loaded all of the values as strings since we haven't specified another schema. We can instead ask it to infer the schema and also handle this extra space magic:


In [26]:
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("./resources/adult.data")

In [27]:
df.cache()


Out[27]:
[age: int, workclass: string, fnlwgt: double, education: string, education-num: double, maritial-status: string, occupation: string, relationship: string, race: string, sex: string, capital-gain: double, capital-loss: double, hours-per-week: double, native-country: string, category: string]

In [28]:
df.head()


Out[28]:
[39, State-gov,77516.0, Bachelors,13.0, Never-married, Adm-clerical, Not-in-family, White, Male,2174.0,0.0,40.0, United-States, <=50K]

In [29]:
df.printSchema()


root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: double (nullable = true)
 |-- maritial-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: double (nullable = true)
 |-- capital-loss: double (nullable = true)
 |-- hours-per-week: double (nullable = true)
 |-- native-country: string (nullable = true)
 |-- category: string (nullable = true)

Now we import the Vector and DecisionTreeModel


In [24]:
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{DecisionTreeClassificationModel, DecisionTreeClassifier}

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{DecisionTreeClassificationModel, DecisionTreeClassifier}

The first step is prepairing the features, here we are just choosing existing numeric features:


In [5]:
val assembler = new VectorAssembler().setInputCols(Array("age", "education-num")).setOutputCol("features")

Now the pipeline only works on doubles, so we need to take our category and turn it into a double. The StringIndexer will do this for us:


In [6]:
val indexer = new StringIndexer().setInputCol("category").setOutputCol("category-index")

Make a pipeline


In [8]:
val pipeline = new Pipeline().setStages(Array(assembler, indexer))

We then need to "fit" our pipeline. This allows the StringIndexer to determine what strings will be assigned what indexes in the eventual transformation:


In [9]:
val model = pipeline.fit(df)

In [10]:
val prepared = model.transform(df)

In [31]:
prepared.head()


Out[31]:
[39, State-gov,77516.0, Bachelors,13.0, Never-married, Adm-clerical, Not-in-family, White, Male,2174.0,0.0,40.0, United-States, <=50K,[39.0,13.0],0.0]

In [11]:
val classifier  = new DecisionTreeClassifier().setFeaturesCol("features").setLabelCol("category-index")

And now we fit on the prepared data


In [12]:
val classifier_model = classifier.fit(prepared)

In [13]:
val pipeline_and_model = new Pipeline().setStages(Array(assembler,indexer,classifier_model))
val pipeline_model = pipeline_and_model.fit(df)

In [14]:
classifier_model.transform(prepared).select("prediction", "category-index").take(20).foreach(println)


[1.0,0.0]
[1.0,0.0]
[0.0,0.0]
[0.0,0.0]
[0.0,0.0]
[1.0,0.0]
[0.0,0.0]
[0.0,1.0]
[0.0,1.0]
[1.0,1.0]
[0.0,1.0]
[0.0,1.0]
[0.0,0.0]
[0.0,0.0]
[0.0,1.0]
[0.0,0.0]
[0.0,0.0]
[0.0,0.0]
[0.0,0.0]
[1.0,1.0]

In [15]:
pipeline_model.transform(df).select("prediction", "category-index").take(20).foreach(println)


[1.0,0.0]
[1.0,0.0]
[0.0,0.0]
[0.0,0.0]
[0.0,0.0]
[1.0,0.0]
[0.0,0.0]
[0.0,1.0]
[0.0,1.0]
[1.0,1.0]
[0.0,1.0]
[0.0,1.0]
[0.0,0.0]
[0.0,0.0]
[0.0,1.0]
[0.0,0.0]
[0.0,0.0]
[0.0,0.0]
[0.0,0.0]
[1.0,1.0]

In [16]:
import org.apache.spark.ml.feature.StringIndexerModel

val labels= (pipeline_model.stages(1).asInstanceOf[StringIndexerModel]).labels
labels


Out[16]:
Array(" <=50K", " >50K")

In [17]:
import org.apache.spark.ml.feature.IndexToString

val inverter = new IndexToString().setInputCol("prediction").setOutputCol("prediction-label").setLabels(labels)

inverter.transform(pipeline_model.transform(df)).select("prediction-label", "category").take(20).foreach(println)


[ >50K, <=50K]
[ >50K, <=50K]
[ <=50K, <=50K]
[ <=50K, <=50K]
[ <=50K, <=50K]
[ >50K, <=50K]
[ <=50K, <=50K]
[ <=50K, >50K]
[ <=50K, >50K]
[ >50K, >50K]
[ <=50K, >50K]
[ <=50K, >50K]
[ <=50K, <=50K]
[ <=50K, <=50K]
[ <=50K, >50K]
[ <=50K, <=50K]
[ <=50K, <=50K]
[ <=50K, <=50K]
[ <=50K, <=50K]
[ >50K, >50K]

In [18]:
import org.apache.spark.ml.feature.IndexToString

In [ ]: