DATASCI W261: Machine Learning at Scale

Week 12, Homework 11

Katrina Adams

kradams@ischool.berkeley.edu
26 November 2015


Start Spark


In [1]:
%cd ~/Documents/W261/hw11/


/Users/davidadams/Documents/W261/hw11

In [2]:
import os
import sys

spark_home = os.environ['SPARK_HOME'] = \
   '/Users/davidadams/packages/spark-1.5.1-bin-hadoop2.6/'

if not spark_home:
    raise ValueError('SPARK_HOME enviroment variable is not set')
sys.path.insert(0,os.path.join(spark_home,'python'))
sys.path.insert(0,os.path.join(spark_home,'python/lib/py4j-0.8.2.1-src.zip'))
execfile(os.path.join(spark_home,'python/pyspark/shell.py'))


Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.5.1
      /_/

Using Python version 2.7.6 (default, Sep  9 2014 15:04:36)
SparkContext available as sc, HiveContext available as sqlContext.

HW11.0 Broadcast versus Caching in Spark

What is the difference between broadcasting and caching data in Spark? Give an example (in the context of machine learning) of each mechanism (at a high level). Feel free to cut and paste code examples from the lectures to support your answer.

Review the following Spark-notebook-based implementation of KMeans and use the broadcast pattern to make this implementation more efficient. Please describe your changes in English first, implement, comment your code and highlight your changes:

http://nbviewer.ipython.org/urls/dl.dropbox.com/s/41q9lgyqhy8ed5g/EM-Kmeans.ipynb

Caching and broadcasting both add values to memory of the worker nodes. Caching occurs for each RDD whereas broadcasting sends a value to each worker. Caching is used to store intermediate data in memory so it can be reused later (eg. training data), whereas broadcasting is used to store a shared variable in memory so it can be used by eack worker (eg. regtression weights).

For the KMeans implementation in Spark, I added a broadcast statement for the new centroids at each iteration. The nearest centroids function then accesses the broadcasted value of the centroids instead of the centroids being serialized and passed with each RDD.


In [3]:
%matplotlib inline
import numpy as np
import pylab 
import json

def generateData():
    size1 = size2 = size3 = 1000
    samples1 = np.random.multivariate_normal([4, 0], [[1, 0],[0, 1]], size1)
    data = samples1
    samples2 = np.random.multivariate_normal([6, 6], [[1, 0],[0, 1]], size2)
    data = np.append(data,samples2, axis=0)
    samples3 = np.random.multivariate_normal([0, 4], [[1, 0],[0, 1]], size3)
    data = np.append(data,samples3, axis=0)
    # Randomlize data
    data = data[np.random.permutation(size1+size2+size3),]
    np.savetxt('data.csv',data,delimiter = ',')
    
    return None

generateData()



#Calculate which class each data point belongs to
def nearest_centroid(line):
    x = np.array([float(f) for f in line.split(',')])
    ## Use broadcast value of centroids
    closest_centroid_idx = np.sum((x - centroidsBroadcast.value)**2, axis=1).argmin()
    return (closest_centroid_idx,(x,1))

#plot centroids and data points for each iteration
def plot_iteration(means):
    pylab.plot(samples1[:, 0], samples1[:, 1], '.', color = 'blue')
    pylab.plot(samples2[:, 0], samples2[:, 1], '.', color = 'blue')
    pylab.plot(samples3[:, 0], samples3[:, 1],'.', color = 'blue')
    pylab.plot(means[0][0], means[0][1],'*',markersize =10,color = 'red')
    pylab.plot(means[1][0], means[1][1],'*',markersize =10,color = 'red')
    pylab.plot(means[2][0], means[2][1],'*',markersize =10,color = 'red')
    pylab.show()

K = 3
# Initialization: initialization of parameter is fixed to show an example
centroids = np.array([[0.0,0.0],[2.0,2.0],[0.0,7.0]])
centroidsBroadcast = sc.broadcast(centroids)

D = sc.textFile("./data.csv").cache()
iter_num = 0
for i in range(10):  
    res = D.map(nearest_centroid).reduceByKey(lambda x,y : (x[0]+y[0],x[1]+y[1])).collect()
    #res [(0, (array([  2.66546663e+00,   3.94844436e+03]), 1001)  ), 
    #     (2, (array([ 6023.84995923,  5975.48511018]), 1000)), 
    #     (1, (array([ 3986.85984761,    15.93153464]), 999))]
    # res[1][1][1] returns 1000 here
    res = sorted(res,key = lambda x : x[0])  #sort based on clusted ID
    centroids_new = np.array([x[1][0]/x[1][1] for x in res])  #divide by cluster size
    if np.sum(np.absolute(centroids_new-centroids))<0.01:
        break
    print "Iteration" + str(iter_num)
    iter_num = iter_num + 1 
    centroids = centroids_new
    ## Boradcast current centroids
    centroidsBroadcast = sc.broadcast(centroids)
    print centroids
    plot_iteration(centroids)
print "Final Results:"
print centroids


Iteration0
[[ 0.99099125  0.43180383]
 [ 3.99817435  2.64754086]
 [ 1.94352474  5.69010188]]
Iteration1
[[ 1.94151883  0.63405138]
 [ 5.36925166  2.87136098]
 [ 1.98386009  5.30683526]]
Iteration2
[[ 3.06338738  0.17738555]
 [ 6.02026368  4.65153759]
 [ 0.96921615  4.84912046]]
Iteration3
[[ 3.88008006 -0.05367885]
 [ 6.00232999  5.96052332]
 [-0.02524695  4.12587747]]
Iteration4
[[ 3.91681088 -0.04624453]
 [ 5.9928049   5.99899483]
 [-0.03643723  4.09558048]]
Final Results:
[[ 3.91681088 -0.04624453]
 [ 5.9928049   5.99899483]
 [-0.03643723  4.09558048]]

HW11.1 Loss Functions

In the context of binary classification problems, does the linear SVM learning algorithm yield the same result as a L2 penalized logistic regesssion learning algorithm?

In your reponse, please discuss the loss functions, and the learnt models, and separating surfaces between the two classes.

In the context of binary classification problems, does the linear SVM learning algorithm yield the same result as a perceptron learning algorithm?

[OPTIONAL]: generate an artifical binary classification dataset with 2 input features and plot the learnt separating surface for both a linear SVM and for logistic regression. Comment on the learnt surfaces. Please feel free to do this in Python (no need to use Spark).

L2 penalized logistic regression: $$J(w) = \sum_{i}(1+\exp(-y(w^Tx_i+b)))+\lambda||w||^2$$

Linear SVM:
$min\frac{1}{2}||w||^2$ subject to constraints $y_i(w \cdot x_i+b)-1\ge0$ $\forall i=1,...,L$ $$J(w) = \frac{1}{m}\sum_{i}(1-y_i(w^Tx_i-b))_++\frac{\lambda}{2}||w||^2$$

Perceptron learning algorithm:
$$J(W,X_1^L)=\sum_{\{X_i|y_i \langle W,X_i \rangle <0\}}(-W^TX_iy_i)$$

Linear SVM and L2 penalized logistic regression or perceptron learning algorithm do not yield the same results. SVM maximizes the margin between classes (minimizes weights) whereas logistic regression and perceptron learning find any solution that works.


HW11.2 Gradient descent

In the context of logistic regression describe and define three flavors of penalized loss functions. Are these all supported in Spark MLLib (include online references to support your answers)?

Descibe probabilitic interpretations of the L1 and L2 priors for penalized logistic regression (HINT: see synchronous slides for week 11 for details)

Lasso, L1: Penalizes based on the magnitude of the weights

Ridge, L2: Penalizes based on the squared magnitude of the weights

Both L1 and L2 regularization are suppoerted in MLLib.

The probabilistic interpretation of the L1 penalty is a Laplacian prior, whereas for L2 it is a Gaussian, therefore, the L1 regularization drives more weights to zero.

Zero-one loss: Counts the number of mistakes, equal to zero if classified correctly and equal to one if misclassified. Does not have a useful gradient.
$L_{01}(m) = 0$ if $m\ge0$
$L_{01}(m) = 1$ if $m<0$

Hinge loss: Loss term for soft-margin SVM
$L_{hinge}(m) = max(0,1-m)$

Log loss: Equivalent to cross-entropy loss used for logistic regression
$L(m) = log(1+e^{-m})$

Hinge loss and logistic loss are both supported in Spark MLLib for linear SVMs, but only logistic loss is supported for logistic regression.

Reference: http://spark.apache.org/docs/latest/mllib-linear-methods.html#logistic-regression


HW11.3 Logistic Regression

Generate 2 sets of linearly separable data with 100 data points each using the data generation code provided below and plot each in separate plots. Call one the training set and the other the testing set.


In [30]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
from numpy.random import rand

def generateData_sep(n):  
    """  
    generates a 2D linearly separable dataset with n samples. 
    The third element of the sample is the label  
    """  
    x1b = (rand(n)*2-1)/2-0.5  
    x2b = (rand(n)*2-1)/2+0.5  
    x1r = (rand(n)*2-1)/2+0.5  
    x2r = (rand(n)*2-1)/2-0.5  
    inputs = []
    for i in range(len(x1b)):  
        inputs.append([x1b[i],x2b[i],1])
        inputs.append([x1r[i],x2r[i],-1])
    #print inputs
    return inputs

def plot_separable_data():
    n = 100
    train = np.array(generateData_sep(n))
    test = np.array(generateData_sep(n))
    f, axarr = plt.subplots(1,2)
    axarr[0].plot(train[train[:,2]==-1,0],train[train[:,2]==-1,1],'bo')
    axarr[0].plot(train[train[:,2]==1,0],train[train[:,2]==1,1],'ro')
    axarr[0].set_title('Training data')
    axarr[0].set_xlabel('x1')
    axarr[0].set_ylabel('x2')
    axarr[1].plot(test[test[:,2]==-1,0],test[test[:,2]==-1,1],'bo')
    axarr[1].plot(test[test[:,2]==1,0],test[test[:,2]==1,1],'ro')
    axarr[1].set_title('Test data')
    axarr[1].set_xlabel('x1')
    axarr[1].set_ylabel('x2')
   
plot_separable_data()


Modify this data generation code to generating non-linearly separable training and testing datasets (with approximately 10% of the data falling on the wrong side of the separating hyperplane. Plot the resulting datasets.

NOTE: For the remainder of this problem please use the non-linearly separable training and testing datasets.

Using MLLib train up a LASSO logistic regression model with the training dataset and evaluate with the testing set. What a good number of iterations for training the logistic regression model? Justify with plots and words.

Derive and implement in Spark a weighted LASSO logistic regression. Implement a convergence test of your choice to check for termination within your training algorithm.

Weight the above training dataset as follows: Weight each example using the inverse vector length (Euclidean norm):

weight(X)= 1/||X||,

where ||X|| = SQRT(X.X)= SQRT(X1^2 + X2^2)

Here X is vector made up of X1 and X2.

Evaluate your homegrown weighted LASSO logistic regression on the test dataset. Report misclassification error (1 - Accuracy) and how many iterations does it took to converge.

Does Spark MLLib have a weighted LASSO logistic regression implementation. If so use it and report your findings on the weighted training set and test set.


In [69]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
from numpy.random import rand

'''
Modify this data generation code to generating non-linearly separable 
training and testing datasets (with approximately 10% of the data 
falling on the wrong side of the separating hyperplane. 
Plot the resulting datasets.
'''


def generateData(n, isSeperable):  
    """  
    generates a 2D non-separable dataset with n samples. 
    The third element of the sample is the label  
    """  
    x1b = (rand(n)*2-1)/2-0.5  
    x2b = (rand(n)*2-1)/2+0.5  
    x1r = (rand(n)*2-1)/2+0.5  
    x2r = (rand(n)*2-1)/2-0.5  
    inputs = []
    for i in range(len(x1b)):  
        inputs.append([x1b[i],x2b[i],1])
        inputs.append([x1r[i],x2r[i],0])
    inputs = np.array(inputs)
    if not isSeperable:
        wrongidx = np.random.choice(n,0.1*n, replace=False)
        inputs[wrongidx,2]=(1-inputs[wrongidx,2])**2
    return inputs

def plot_nonseparable_data():
    n = 100
    train = generateData(n, isSeperable=False)
    test = generateData(n, isSeperable=False)
    
    np.savetxt('train.csv',train,delimiter = ',')
    np.savetxt('test.csv',test,delimiter = ',')
    
    f, axarr = plt.subplots(1,2)
    axarr[0].plot(train[train[:,2]==0,0],train[train[:,2]==0,1],'bo')
    axarr[0].plot(train[train[:,2]==1,0],train[train[:,2]==1,1],'ro')
    axarr[0].set_title('Training data')
    axarr[0].set_xlabel('x1')
    axarr[0].set_ylabel('x2')
    axarr[1].plot(test[test[:,2]==0,0],test[test[:,2]==0,1],'bo')
    axarr[1].plot(test[test[:,2]==1,0],test[test[:,2]==1,1],'ro')
    axarr[1].set_title('Test data')
    axarr[1].set_xlabel('x1')
    axarr[1].set_ylabel('x2')
    
    return None

    
plot_nonseparable_data()



In [51]:
!head train.csv


-1.951490696725313168e-01,3.404348121414211281e-01,1.000000000000000000e+00
6.523549010040190499e-01,-1.395700459946421779e-01,0.000000000000000000e+00
-5.609637949447165983e-01,8.957818556081109662e-01,1.000000000000000000e+00
6.613161160314987574e-01,-6.856900100951484056e-01,0.000000000000000000e+00
-9.499876028104793368e-01,9.738258827547474628e-01,1.000000000000000000e+00
6.955570149760982313e-01,-2.532897105232218005e-01,0.000000000000000000e+00
-2.740108201460167425e-02,2.137977634712210362e-01,1.000000000000000000e+00
4.865875414694887935e-01,-5.188783398217240439e-01,0.000000000000000000e+00
-8.989293209280633246e-01,7.689179951171594141e-01,1.000000000000000000e+00
4.231400123242646805e-01,-6.572453340482935502e-01,1.000000000000000000e+00

In [70]:
'''
Using MLLib train a L1 logistic regression model with the training dataset 
and evaluate with the testing set. 
What is a good number of iterations for training the logistic regression model? 
Justify with plots and words.
'''

from pyspark.mllib.classification import LogisticRegressionWithSGD, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint

def parsePoint(line):
    values = [float(x) for x in line.split(',')]
    return LabeledPoint(values[2], values[:2])

traindatalines = sc.textFile("train.csv")
traindata = traindatalines.map(parsePoint)
#print traindata.collect()

testdatalines = sc.textFile("train.csv")
testdata = testdatalines.map(parsePoint)

# Build the model
model = LogisticRegressionWithSGD.train(traindata, iterations=10, regType="l1")

# Evaluating the model on training data
labelsAndPreds = testdata.map(lambda p: (p.label, model.predict(p.features)))
testErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(testdata.count())
print("Testing Error = " + str(testErr))


Testing Error = 0.05

In [ ]:
'''
Derive and implement in Spark a weighted LASSO logistic regression. 
Implement a convergence test of your choice to check for termination 
within your training algorithm.

Weight the above training dataset as follows:
Weight each example using the inverse vector length (Euclidean norm):
weight(X)= 1/||X||,
where ||X|| = SQRT(X.X)= SQRT(X1^2 + X2^2)
Here X is vector made up of X1 and X2.

Evaluate your homegrown weighted LASSO logistic regression on the test dataset. 
Report misclassification error (1 - Accuracy) and 
how many iterations does it take to converge.'''


def logisticRegressionGDReg(data, wInitial=None, learningRate=0.05, iterations=50, regParam=0.01, regType="Lasso"):
    featureLen = len(data.take(1)[0].x)
    n = data.count()
    if wInitial is None:
        w = np.random.normal(size=featureLen) # w should be broadcasted if it is large
    else:
        w = wInitial
    for i in range(iterations):
        wBroadcast = sc.broadcast(w)
        gradient = data.map(lambda p: (1 / (1 + np.exp(-p.y*np.dot(wBroadcast.value, p.x)))-1) * p.y * np.array(p.x))\
                    .reduce(lambda a, b: a + b)
        if regType == "Ridge":
            wReg = w * 1
            wReg[-1] = 0 #last value of weight vector is bias term, ignored in regularization
        elif regType == "Lasso":
            wReg = w * 1
            wReg[-1] = 0 #last value of weight vector is bias term, ignored in regularization
            wReg = (wReg>0).astype(int) * 2-1
        else:
            wReg = np.zeros(w.shape[0])
        gradient = gradient + regParam * wReg  #gradient:  GD of Sqaured Error+ GD of regularized term 
        w = w - learningRate * gradient / n
    return w

HW11.4 SVMs Use the non-linearly separable training and testing datasets from HW11.3 in this problem.

Using MLLib train up a soft SVM model with the training dataset and evaluate with the testing set. What is a good number of iterations for training the SVM model? Justify with plots and words.

Derive and Implement in Spark a weighted soft linear svm classification learning algorithm.
Evaluate your homegrown weighted soft linear svm classification learning algorithm on the weighted training dataset and test dataset from HW11.3. Report misclassification error (1 - Accuracy) and how many iterations does it took to converge? How many support vectors do you end up?

Does Spark MLLib have a weighted soft SVM learner. If so use it and report your findings on the weighted training set and test set.


In [ ]:


11.5 [OPTIONAL] Distributed Perceptron algorithm.

Using the following papers as background:
http://static.googleusercontent.com/external_content/untrusted_dlcp/research.google.com/en//pubs/archive/36266.pdf

https://www.dropbox.com/s/a5pdcp0r8ptudgj/gesmundo-tomeh-eacl-2012.pdf?dl=0

http://www.slideshare.net/matsubaray/distributed-perceptron

Implement each of the following flavors of perceptron learning algorithm:

  1. Serial (All Data): This is the classifier returned if trained serially on all the available data. On a single computer for example (Mistake driven)
  2. Serial (Sub Sampling): Shard the data, select one shard randomly and train serially.
  3. Parallel (Parameter Mix): Learn a perceptron locally on each shard: Once learning is complete combine each learnt percepton using a uniform weighting
  4. Parallel (Iterative Parameter Mix) as described in the above papers.

11.6 [OPTIONAL: consider doing this in a group] Evalution perceptron algorihtms on PennTreeBank POS corpus
Reproduce the experiments reported in the following paper:

HadoopPerceptron: a Toolkit for Distributed Perceptron Training and Prediction with MapReduce
Andrea Gesmundo and Nadi Tomeh

http://www.aclweb.org/anthology/E12-2020

These experiments focus on the prediction accuracy on a part-of-speech (POS) task using the PennTreeBank corpus. They use sections 0-18 of the WallStreet Journal for training, and sections 22-24 for testing.

HW11.7 [OPTIONAL: consider doing this in a group] Kernal Adatron

Implement the Kernal Adatron in Spark (contact Jimi for details)


In [ ]: