**Bartley Richardson**

16 February 2017

```
In [1]:
```# first line reads in the smaller file (10% of data) for testing
# df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true', delimiter=';').load('./bank-additional/bank-additional.csv')
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true', delimiter=';').load('./data/bank-additional-full.csv')

```
In [2]:
```df.printSchema()

```
```

We also would like to know how many rows we're dealing with here.

```
In [3]:
```print "Number of rows in dataset =", df.count()

```
```

```
In [4]:
```df = df.withColumnRenamed("y","label")

There are a number of categorical variables that we'd like to use in our classifier, so we'll need to convert them to a numerical type. StringIndexer in pyspark.ml is useful for this. Although we won't use it in this example, there is a complementary function (IndexToString) that can map a column of label indices back to their original string (original categorical) values. Handy!

For this first model, we're going to select some features seemingly randomly (I know, but it's just the first time through to demo). We'll use one numerical field as well as one categorical field. The numerical fields are fine, but we'll need to use StringIndexer to deal with the categorical field. Since the label is also categorical ("yes" or "no"), we'll also have to deal with that too.

```
In [5]:
```from pyspark.ml.feature import StringIndexer
indexers = [StringIndexer(inputCol="job", outputCol="job_index"),
StringIndexer(inputCol="loan", outputCol="loan_index"),
StringIndexer(inputCol="label", outputCol="label_index")]

The Pipeline function is useful when we have multiple steps.

```
In [6]:
```from pyspark.ml import Pipeline
index_pipeline = Pipeline(stages=indexers)
indexed_df = index_pipeline.fit(df).transform(df)

```
In [7]:
```indexed_df.take(1)

```
Out[7]:
```

```
In [8]:
```from pyspark.mllib.regression import LabeledPoint
def parse_data_1(row):
features = [row.age, row.loan_index]
return LabeledPoint(row.label_index, features)

```
In [9]:
```parsed_data = indexed_df.rdd.map(parse_data_1)

And now we'll inspect a few points from the parsed data RDD.

```
In [10]:
```parsed_data.take(2)

```
Out[10]:
```

```
In [11]:
```train, test = parsed_data.randomSplit([0.7, 0.3])

```
In [12]:
```from pyspark.mllib.classification import LogisticRegressionWithLBFGS
logit_model_1 = LogisticRegressionWithLBFGS.train(train)

```
In [13]:
```labels_predictions = test.map(lambda p: (p.label, logit_model_1.predict(p.features)))
test_accuracy_1 = labels_predictions.filter(lambda (v, p): v == p).count() / float(test.count())
print "Accuracy of first model =", test_accuracy_1

```
```

In addition to the features used in the first model, we're now also going to use `default`

, `poutcome`

(how did they participate in the previous campaign), and two variables (`pdays`

and `pcampaign`

) that show how the client was contacted in the previous campaign. We'd also like to add in some economic factors, so we'll include `euribor3m`

.

For completeness, we'll reproduce the steps in creating the above model (including the import statements).

```
In [14]:
```from pyspark.ml.feature import StringIndexer
indexers = [StringIndexer(inputCol="job", outputCol="job_index"),
StringIndexer(inputCol="label", outputCol="label_index"),
StringIndexer(inputCol="default", outputCol="default_index"),
StringIndexer(inputCol="poutcome", outputCol="poutcome_index")]

```
In [15]:
```from pyspark.ml import Pipeline
index_pipeline = Pipeline(stages=indexers)
indexed_df = index_pipeline.fit(df).transform(df)

Again, let's take one row to make sure we did everything correct.

```
In [16]:
```indexed_df.take(1)

```
Out[16]:
```

```
In [17]:
```from pyspark.mllib.regression import LabeledPoint
def parse_data_2(row):
features = [row.age, row.campaign, row.pdays, row.previous, row.poutcome_index,
row.job_index, row.default_index, row.euribor3m]
return LabeledPoint(row.label_index, features)

```
In [18]:
```parsed_data = indexed_df.rdd.map(parse_data_2)

We'll split the data again.

```
In [19]:
```train, test = parsed_data.randomSplit([0.7, 0.3])

```
In [20]:
```from pyspark.mllib.classification import LogisticRegressionWithLBFGS
logit_model_2 = LogisticRegressionWithLBFGS.train(train)
labels_predictions_2 = test.map(lambda p: (p.label, logit_model_2.predict(p.features)))
test_accuracy_2 = labels_predictions_2.filter(lambda (v, p): v == p).count() / float(test.count())
print "Accuracy of second model =", test_accuracy_2

```
```

The accuracy is somewhat improved, but at the cost of adding complexity to the model in terms of number of featrures. In fact, there are much better regression models that can be created.

For now, we'll move on to an unsupervised learning example.

Supervised learning is great when you have labels, but what about when don't have those? Then we can turn to unsupervised techniques.

Word2Vec is a shallow, two-layer neural network that is used to learn word embeddings and embed them in a high-dimensional vector space. There are many applications for Word2Vec, and they need not be limited to natural language. However, for the purposes of this talk, natural language is the most straightforward and easy to understand. Thankfully, Word2Vec is built into many packages and technologies, including MLLib.

```
In [1]:
```from pyspark.mllib.feature import Word2Vec
import pandas as pd
import seaborn as sns

```
In [2]:
```doc = sc.textFile("./data/questions-words.txt").map(lambda line: line.split(" "))

Just to get a handle on the file, let's see how many lines there are.

```
In [3]:
```doc.count()

```
Out[3]:
```

And now we'll take a look at what's in the file.

```
In [4]:
```doc.take(10)

```
Out[4]:
```

```
In [5]:
```doc.sample(False, 0.001, 1824).collect()

```
Out[5]:
```

*are* other analogies in there. That's a good sign! Ready for the magic? Don't blink, it happens fast.

```
In [6]:
```model = Word2Vec().setWindowSize(2).setSeed(7712).fit(doc)

Did you miss it? We already created a Word2Vec model. Sure, there are quite a few additional options available. Perhaps the most useful are setting the vector size (the dimensionally used for the embedding) and the window size. The defaults are 100 and 5, respectively. Our sentences (analogies, in this case) aren't long at all, so we've restricted the window size. We've also seeded manually in order to reproduce results.

One thing we can do is query the model for synonyms of a particular word. This doesn't necessairly mean the same thing as the English language definition of synonym but rather means words that have similar embedding (via cosine similarity) to the given word.

```
In [7]:
```model.findSynonyms("girl", 10)

```
Out[7]:
```

Makes a little bit of sense, especially if you look hard.

One huge advantage of Word2Vec is the ability to perform vector arithmetic on the embedded words. The classic example is:

```
man + king - queen = woman
```

We can do something similar here, but we'll use:

`girl + brother - boy`

```
In [8]:
```vector_1 = model.transform("girl")+model.transform("brother")-model.transform("boy")

```
In [9]:
```model.findSynonyms(vector_1,10)

```
Out[9]:
```

Does it make sense? A little. We see that most of the words have something to do with being female, and `sister`

is even in the list (pretty high too).

What we did was use the model to transform the word into the vector (a DenseVector in this case). It looks like this.

```
In [10]:
```model.transform("brother")

```
Out[10]:
```

This is all well and good, but what if we want to visualize the results? Unless you have a 100-dimensional display, this can get difficult. Thankfully, there are some simple ways we can still get a nice two-dimensional representation of the vectors.

Pick your favorite dimensionality reduction technique. We'll use t-SNE (of course we will, it won a prize!). We're cheating a bit and using sklearn though.

```
In [11]:
```import numpy as np
from sklearn.manifold import TSNE

In order to get down to two dimensions, we specify `n_components=2`

.

```
In [12]:
```tsne_model = TSNE(n_components=2, random_state=0)
# np.set_printoptions(suppress=True)

`getVectors()`

function to extract only the high-dimenstional vectors from the Word2Vec model.

```
In [13]:
```vects = model.getVectors()

Now we can do the dimensionality reduction.

```
In [14]:
```tsne_out = tsne_model.fit_transform(vects.values())

Okay, now what? The whole point of doing this was to get a better picture (get it, picture?) of what the word embeddings look like. So we'll plot it.

*Disclaimer: I very much dislike plotting using Matplotlib. Nothing against it or you if you love it; it's just not for me. So what follows isn't super efficient, but it keeps me happy.*

Let's create a Pandas dataframe with the t-SNE (x, y) output.

```
In [15]:
```tsne_df = pd.DataFrame(tsne_out, columns=['x','y'])

```
In [16]:
```%matplotlib inline
ax = sns.lmplot('x', 'y', tsne_df, fit_reg=False, size=8,
scatter_kws={'alpha':0.7,'s':60})

```
```

```
In [17]:
```tsne_df["word"] = vects.keys()

```
In [18]:
```tsne_df[(tsne_df['x']>0) & (tsne_df['x']<11) &
(tsne_df['y']>18) & (tsne_df['y']<30)][:10]

```
Out[18]:
```

In the above scenario, we're looking at the top-most grouping, just to the right of center. It's primarily comprised of adverbs that end in -ly, which is an interesting find.

However, without doing this for every grouping or using a tool like Plotly (which is seriously cool, and you should check it out if you haven't used it...just use the offline mode though), we're at an impass. Here's where we'll cheat a bit more in the interest of time. In order to visualize this, we'll export the data and read it into Tableau.

```
In [39]:
```tsne_df.to_csv("./tsne_wordvects.tab", sep='\t')

*k*-means clustering right inside of Tableau. Let's do this (letting Tableau determine what *k* should be). Tableau chooses k=16, and we can view the resuls right here.

*after* performing dimensionality reduction from a high-dimensional space to a two-dimensional space. It may be better if we cluster the actual word vectors and then apply t-SNE. We can accomplish this in MLLib.

We can continue to investigate the model in Tableau, or we can try to do one last thing. We have these word vectors in high-dimensional space, what if we tried to *learn* something about their embedding in this space *without* doing a visual analysis? What if we applied *k*-means clustering to the vectors?

Let's try! MLLib has *k*-means clustering built right in, so it should be fairly straightforward.

```
In [19]:
```from pyspark.mllib.clustering import KMeans, KMeansModel

*k*-means is expecting a RDD of arrays. We know what we have to do!

```
In [20]:
```vectors_only_array = np.asanyarray(vects.values())

```
In [21]:
```vectors_rdd = sc.parallelize(vectors_only_array)

That wasn't too bad. Now that we have the vectors as arrays in a RDD, we can train the *k*-means model. Of course, we need to define *k* before we can cluster. There are other clustering methods (e.g., Unsupervised Niche Clustering) that don't requrie *k* to be known *a priori*, but that's a little more than we want to deal with right now. Also, it's not built into MLLib (and we've already cheated plenty).

We'll make an educated guess that *k* is 10. If you like, you can experiment with differnet values of *k* to see what happens.

```
In [22]:
```clusters = KMeans.train(vectors_rdd, 10,
maxIterations=10, initializationMode="random")

Excellent! That didn't take long, and now we have a model. By this time, my well-known issues with Matplotlib will continue to effect this data analysis, and we'll pivot away from Python plotting. Since I have access to Tableau, I'm going to get the data in a format where I can export it and visualize it there. If you prefer and love Matplotlib or want to learn Plotly, you should definitely do that! I'd love to see the results!

Let's create a dataframe that has the keys (words) and vectors. There's a hiccup if you try to go directly from the Word2Vec dict to a Pandas dataframe (you'll get a column for every value in the vector, all 100 of them), so we'll take a Pythonic approach to fixing this. I realize this is probably not the most efficient way, but it works for now.

```
In [23]:
```individual_rows = []
for key, value in vects.iteritems():
individual_rows.append([key,np.asarray(list(value))])
vectors_df = pd.DataFrame(individual_rows)
vectors_df.columns = ["key","vector"]

We'll look at what we have, just to make sure we're on the right track.

```
In [24]:
```vectors_df.head()

```
Out[24]:
```

*k*-means clustering model to predict what cluster each key (i.e., word) belongs to using its vector representation. We'll do that, then add the results back in to the dataframe as their own column.

```
In [25]:
```vectors_df['cluster_label'] = vectors_df['vector'].apply(lambda x: clusters.predict(x))

```
In [26]:
```vectors_df.head()

```
Out[26]:
```

```
In [27]:
```tsne_df.head()

```
Out[27]:
```

```
In [28]:
```full_vectors_df = vectors_df.merge(tsne_df, left_on='key',
right_on='word', how='inner')

```
In [29]:
```full_vectors_df.head()

```
Out[29]:
```

`vector`

column, and there is a repeated column (`key`

and `word`

are the same), so we'll take a subset of the columns and save them out.

```
In [30]:
```full_vectors_df[['word','x','y','cluster_label']].to_csv("./word_vectors_clusters_n10_tsne.tab", sep='\t')

If instead of k=10, we increase the number of clusters to 16, we can achieve the plot below.

Insipriation taken from: https://github.com/jadianes/spark-py-notebooks

Word2Vec example: https://www.tensorflow.org/tutorials/word2vec/

Original Google Word2Vec code: https://code.google.com/archive/p/word2vec/

`StringIndexer`

function, metadata about the indexed column is stored in the dataframe. Although not visible to the user, it is there and can be used to convert the index back to the original categorical value. Below is an example of using IndexToString to recover the original categorical values.

```
In [55]:
```from pyspark.ml.feature import IndexToString
converters = [IndexToString(inputCol="job_index", outputCol="orig_job_category"),
IndexToString(inputCol="default_index", outputCol="orig_default_category"),
IndexToString(inputCol="poutcome_index", outputCol="orig_poutcome_category")]
converter_pipeline = Pipeline(stages=converters)
converted = converter_pipeline.fit(indexed_df).transform(indexed_df)

```
In [57]:
```converted.take(2)

```
Out[57]:
```