Bartley Richardson
16 February 2017
This notebook was created for the NOVA Data Science meetup on 16 February 2017. The focus of the notebook is on basic PySpark MLLib usage. All examples and code are provided as-is with no guarantee of correctness. The user assumes all risks from reusing this code.
We're going to use the Bank Marketing data set from 2012. It includes some categorical features as well as numeric features, and there are only two target classes (yes or no if the client subscribed a term deposit). This is useful since PySpark's version of MLLib does not currently support multi-class classification using logistic regression.
In [1]:
# first line reads in the smaller file (10% of data) for testing
# df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true', delimiter=';').load('./bank-additional/bank-additional.csv')
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true', delimiter=';').load('./data/bank-additional-full.csv')
It's useful to see the schema of the data we just imported, even though we know what the website says.
In [2]:
df.printSchema()
We also would like to know how many rows we're dealing with here.
In [3]:
print "Number of rows in dataset =", df.count()
The target variable is named y, so let's rename it to something that makes a bit more sense (we'll use "label" in this case).
In [4]:
df = df.withColumnRenamed("y","label")
There are a number of categorical variables that we'd like to use in our classifier, so we'll need to convert them to a numerical type. StringIndexer in pyspark.ml is useful for this. Although we won't use it in this example, there is a complementary function (IndexToString) that can map a column of label indices back to their original string (original categorical) values. Handy!
For this first model, we're going to select some features seemingly randomly (I know, but it's just the first time through to demo). We'll use one numerical field as well as one categorical field. The numerical fields are fine, but we'll need to use StringIndexer to deal with the categorical field. Since the label is also categorical ("yes" or "no"), we'll also have to deal with that too.
In [5]:
from pyspark.ml.feature import StringIndexer
indexers = [StringIndexer(inputCol="job", outputCol="job_index"),
StringIndexer(inputCol="loan", outputCol="loan_index"),
StringIndexer(inputCol="label", outputCol="label_index")]
The Pipeline function is useful when we have multiple steps.
In [6]:
from pyspark.ml import Pipeline
index_pipeline = Pipeline(stages=indexers)
indexed_df = index_pipeline.fit(df).transform(df)
If we inspect one row of the indexed_df, we can see that we now have numeric values for the two categorical fields.
In [7]:
indexed_df.take(1)
Out[7]:
The logistic regression algorithm in PySpark MLLib is expecting a RDD of LabeledPoint values. We'll need to write a simple parsing function in order to create this.
In [8]:
from pyspark.mllib.regression import LabeledPoint
def parse_data_1(row):
features = [row.age, row.loan_index]
return LabeledPoint(row.label_index, features)
In [9]:
parsed_data = indexed_df.rdd.map(parse_data_1)
And now we'll inspect a few points from the parsed data RDD.
In [10]:
parsed_data.take(2)
Out[10]:
In order to train the model, we'll split our data into training (70%) and testing (30%). We can use the built-in randomSplit function for this.
In [11]:
train, test = parsed_data.randomSplit([0.7, 0.3])
We're finally ready to train our logstic regression model. For this case, we're going to use LogisticRegressionWithLBFGS (more information on limited-memory BFGS, if you're interested).
In [12]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
logit_model_1 = LogisticRegressionWithLBFGS.train(train)
Now that we have the model, we'll use the testing data to evaluate the accuracy of the model. We'll use the model we created above to predict labeles for the testing data and then compare to the actual label.
In [13]:
labels_predictions = test.map(lambda p: (p.label, logit_model_1.predict(p.features)))
test_accuracy_1 = labels_predictions.filter(lambda (v, p): v == p).count() / float(test.count())
print "Accuracy of first model =", test_accuracy_1
This isn't terrible, but we can maybe do better. Let's introduce a few more features and create a second model.
In addition to the features used in the first model, we're now also going to use default
, poutcome
(how did they participate in the previous campaign), and two variables (pdays
and pcampaign
) that show how the client was contacted in the previous campaign. We'd also like to add in some economic factors, so we'll include euribor3m
.
For completeness, we'll reproduce the steps in creating the above model (including the import statements).
In [14]:
from pyspark.ml.feature import StringIndexer
indexers = [StringIndexer(inputCol="job", outputCol="job_index"),
StringIndexer(inputCol="label", outputCol="label_index"),
StringIndexer(inputCol="default", outputCol="default_index"),
StringIndexer(inputCol="poutcome", outputCol="poutcome_index")]
In [15]:
from pyspark.ml import Pipeline
index_pipeline = Pipeline(stages=indexers)
indexed_df = index_pipeline.fit(df).transform(df)
Again, let's take one row to make sure we did everything correct.
In [16]:
indexed_df.take(1)
Out[16]:
Looks good! Now we'll slightly modify the parse_data_1 function to create one that deals with the additional features.
In [17]:
from pyspark.mllib.regression import LabeledPoint
def parse_data_2(row):
features = [row.age, row.campaign, row.pdays, row.previous, row.poutcome_index,
row.job_index, row.default_index, row.euribor3m]
return LabeledPoint(row.label_index, features)
In [18]:
parsed_data = indexed_df.rdd.map(parse_data_2)
We'll split the data again.
In [19]:
train, test = parsed_data.randomSplit([0.7, 0.3])
In [20]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
logit_model_2 = LogisticRegressionWithLBFGS.train(train)
labels_predictions_2 = test.map(lambda p: (p.label, logit_model_2.predict(p.features)))
test_accuracy_2 = labels_predictions_2.filter(lambda (v, p): v == p).count() / float(test.count())
print "Accuracy of second model =", test_accuracy_2
The accuracy is somewhat improved, but at the cost of adding complexity to the model in terms of number of featrures. In fact, there are much better regression models that can be created.
For now, we'll move on to an unsupervised learning example.
Supervised learning is great when you have labels, but what about when don't have those? Then we can turn to unsupervised techniques.
Word2Vec is a shallow, two-layer neural network that is used to learn word embeddings and embed them in a high-dimensional vector space. There are many applications for Word2Vec, and they need not be limited to natural language. However, for the purposes of this talk, natural language is the most straightforward and easy to understand. Thankfully, Word2Vec is built into many packages and technologies, including MLLib.
In [1]:
from pyspark.mllib.feature import Word2Vec
import pandas as pd
import seaborn as sns
In [2]:
doc = sc.textFile("./data/questions-words.txt").map(lambda line: line.split(" "))
Just to get a handle on the file, let's see how many lines there are.
In [3]:
doc.count()
Out[3]:
And now we'll take a look at what's in the file.
In [4]:
doc.take(10)
Out[4]:
In [5]:
doc.sample(False, 0.001, 1824).collect()
Out[5]:
Okay, so there are other analogies in there. That's a good sign! Ready for the magic? Don't blink, it happens fast.
In [6]:
model = Word2Vec().setWindowSize(2).setSeed(7712).fit(doc)
Did you miss it? We already created a Word2Vec model. Sure, there are quite a few additional options available. Perhaps the most useful are setting the vector size (the dimensionally used for the embedding) and the window size. The defaults are 100 and 5, respectively. Our sentences (analogies, in this case) aren't long at all, so we've restricted the window size. We've also seeded manually in order to reproduce results.
One thing we can do is query the model for synonyms of a particular word. This doesn't necessairly mean the same thing as the English language definition of synonym but rather means words that have similar embedding (via cosine similarity) to the given word.
In [7]:
model.findSynonyms("girl", 10)
Out[7]:
Makes a little bit of sense, especially if you look hard.
One huge advantage of Word2Vec is the ability to perform vector arithmetic on the embedded words. The classic example is:
man + king - queen = woman
We can do something similar here, but we'll use:
girl + brother - boy
In [8]:
vector_1 = model.transform("girl")+model.transform("brother")-model.transform("boy")
In [9]:
model.findSynonyms(vector_1,10)
Out[9]:
Does it make sense? A little. We see that most of the words have something to do with being female, and sister
is even in the list (pretty high too).
What we did was use the model to transform the word into the vector (a DenseVector in this case). It looks like this.
In [10]:
model.transform("brother")
Out[10]:
This is all well and good, but what if we want to visualize the results? Unless you have a 100-dimensional display, this can get difficult. Thankfully, there are some simple ways we can still get a nice two-dimensional representation of the vectors.
Pick your favorite dimensionality reduction technique. We'll use t-SNE (of course we will, it won a prize!). We're cheating a bit and using sklearn though.
In [11]:
import numpy as np
from sklearn.manifold import TSNE
In order to get down to two dimensions, we specify n_components=2
.
In [12]:
tsne_model = TSNE(n_components=2, random_state=0)
# np.set_printoptions(suppress=True)
We don't want to use t-SNE on the words, just the vectors. So we'll use the built-in getVectors()
function to extract only the high-dimenstional vectors from the Word2Vec model.
In [13]:
vects = model.getVectors()
Now we can do the dimensionality reduction.
In [14]:
tsne_out = tsne_model.fit_transform(vects.values())
Okay, now what? The whole point of doing this was to get a better picture (get it, picture?) of what the word embeddings look like. So we'll plot it.
Disclaimer: I very much dislike plotting using Matplotlib. Nothing against it or you if you love it; it's just not for me. So what follows isn't super efficient, but it keeps me happy.
Let's create a Pandas dataframe with the t-SNE (x, y) output.
In [15]:
tsne_df = pd.DataFrame(tsne_out, columns=['x','y'])
Now we can plot it! I'll use Seaborn just to make it pretty. And don't forget to set alpha transparency!
In [16]:
%matplotlib inline
ax = sns.lmplot('x', 'y', tsne_df, fit_reg=False, size=8,
scatter_kws={'alpha':0.7,'s':60})
It looks nice, and you can even see clusters (although they may just be in your imagination). Without plotting again and putting labels on the plot, we can add the words back into the dataframe then manually slice the dataframe to see what words appear together.
In [17]:
tsne_df["word"] = vects.keys()
In [18]:
tsne_df[(tsne_df['x']>0) & (tsne_df['x']<11) &
(tsne_df['y']>18) & (tsne_df['y']<30)][:10]
Out[18]:
In the above scenario, we're looking at the top-most grouping, just to the right of center. It's primarily comprised of adverbs that end in -ly, which is an interesting find.
However, without doing this for every grouping or using a tool like Plotly (which is seriously cool, and you should check it out if you haven't used it...just use the offline mode though), we're at an impass. Here's where we'll cheat a bit more in the interest of time. In order to visualize this, we'll export the data and read it into Tableau.
In [39]:
tsne_df.to_csv("./tsne_wordvects.tab", sep='\t')
From Tableau, we can obtain a plot of this data. We'll color it by words that represent a country (orange) and those that don't. We see that many of the countries group together, with the exception of Georgia. Since Georgia is also a US state, it makes a little bit of sense that it may be confused with that and grouped with US states.
As of Tableau v10, you can do k-means clustering right inside of Tableau. Let's do this (letting Tableau determine what k should be). Tableau chooses k=16, and we can view the resuls right here.
This isn't terribly exciting. We see some nice results (such as the US cities and states, cluster 9). Also, cluster 7 is mostly relationships/family names (e.g., brother, stepson, stepdaughter, niece). However, there's an inherient issue with clustering after performing dimensionality reduction from a high-dimensional space to a two-dimensional space. It may be better if we cluster the actual word vectors and then apply t-SNE. We can accomplish this in MLLib.
We can continue to investigate the model in Tableau, or we can try to do one last thing. We have these word vectors in high-dimensional space, what if we tried to learn something about their embedding in this space without doing a visual analysis? What if we applied k-means clustering to the vectors?
Let's try! MLLib has k-means clustering built right in, so it should be fairly straightforward.
In [19]:
from pyspark.mllib.clustering import KMeans, KMeansModel
Time to do a little data massaging to get our vectors into a format that pyspark.mllib.clustering.KMeans wants. MLLib's implementation of k-means is expecting a RDD of arrays. We know what we have to do!
In [20]:
vectors_only_array = np.asanyarray(vects.values())
In [21]:
vectors_rdd = sc.parallelize(vectors_only_array)
That wasn't too bad. Now that we have the vectors as arrays in a RDD, we can train the k-means model. Of course, we need to define k before we can cluster. There are other clustering methods (e.g., Unsupervised Niche Clustering) that don't requrie k to be known a priori, but that's a little more than we want to deal with right now. Also, it's not built into MLLib (and we've already cheated plenty).
We'll make an educated guess that k is 10. If you like, you can experiment with differnet values of k to see what happens.
In [22]:
clusters = KMeans.train(vectors_rdd, 10,
maxIterations=10, initializationMode="random")
Excellent! That didn't take long, and now we have a model. By this time, my well-known issues with Matplotlib will continue to effect this data analysis, and we'll pivot away from Python plotting. Since I have access to Tableau, I'm going to get the data in a format where I can export it and visualize it there. If you prefer and love Matplotlib or want to learn Plotly, you should definitely do that! I'd love to see the results!
Let's create a dataframe that has the keys (words) and vectors. There's a hiccup if you try to go directly from the Word2Vec dict to a Pandas dataframe (you'll get a column for every value in the vector, all 100 of them), so we'll take a Pythonic approach to fixing this. I realize this is probably not the most efficient way, but it works for now.
In [23]:
individual_rows = []
for key, value in vects.iteritems():
individual_rows.append([key,np.asarray(list(value))])
vectors_df = pd.DataFrame(individual_rows)
vectors_df.columns = ["key","vector"]
We'll look at what we have, just to make sure we're on the right track.
In [24]:
vectors_df.head()
Out[24]:
Looks good! Now we need to use the k-means clustering model to predict what cluster each key (i.e., word) belongs to using its vector representation. We'll do that, then add the results back in to the dataframe as their own column.
In [25]:
vectors_df['cluster_label'] = vectors_df['vector'].apply(lambda x: clusters.predict(x))
In [26]:
vectors_df.head()
Out[26]:
If we're going to plot the data in two-dimensions, we'll need the t-SNE dimensionality reduction (i.e., the (x,y) coordinates). We already have that calculated; let's remind ourselves what that dataframe looks like.
In [27]:
tsne_df.head()
Out[27]:
Almost there! We'll merge the two dataframes to create one dataframe to rule them all. Then we'll take a look at the first few rows.
In [28]:
full_vectors_df = vectors_df.merge(tsne_df, left_on='key',
right_on='word', how='inner')
In [29]:
full_vectors_df.head()
Out[29]:
Looks good. The last step is to save this dataframe out so we can load it into Tableau. We don't need the vector
column, and there is a repeated column (key
and word
are the same), so we'll take a subset of the columns and save them out.
In [30]:
full_vectors_df[['word','x','y','cluster_label']].to_csv("./word_vectors_clusters_n10_tsne.tab", sep='\t')
Now we can complete the analysis in Tableau. We can import the images here. Below is the plot of the t-SNE results after clustering with k=10.
If instead of k=10, we increase the number of clusters to 16, we can achieve the plot below.
Insipriation taken from: https://github.com/jadianes/spark-py-notebooks
Word2Vec example: https://www.tensorflow.org/tutorials/word2vec/
Original Google Word2Vec code: https://code.google.com/archive/p/word2vec/
As part of the StringIndexer
function, metadata about the indexed column is stored in the dataframe. Although not visible to the user, it is there and can be used to convert the index back to the original categorical value. Below is an example of using IndexToString to recover the original categorical values.
In [55]:
from pyspark.ml.feature import IndexToString
converters = [IndexToString(inputCol="job_index", outputCol="orig_job_category"),
IndexToString(inputCol="default_index", outputCol="orig_default_category"),
IndexToString(inputCol="poutcome_index", outputCol="orig_poutcome_category")]
converter_pipeline = Pipeline(stages=converters)
converted = converter_pipeline.fit(indexed_df).transform(indexed_df)
In [57]:
converted.take(2)
Out[57]: