Start Spark
In [1]:
%cd ~/Documents/W261/hw11/
In [2]:
import os
import sys
spark_home = os.environ['SPARK_HOME'] = \
'/Users/davidadams/packages/spark-1.5.1-bin-hadoop2.6/'
if not spark_home:
raise ValueError('SPARK_HOME enviroment variable is not set')
sys.path.insert(0,os.path.join(spark_home,'python'))
sys.path.insert(0,os.path.join(spark_home,'python/lib/py4j-0.8.2.1-src.zip'))
execfile(os.path.join(spark_home,'python/pyspark/shell.py'))
HW11.0 Broadcast versus Caching in Spark
What is the difference between broadcasting and caching data in Spark? Give an example (in the context of machine learning) of each mechanism (at a high level). Feel free to cut and paste code examples from the lectures to support your answer.
Review the following Spark-notebook-based implementation of KMeans and use the broadcast pattern to make this implementation more efficient. Please describe your changes in English first, implement, comment your code and highlight your changes:
http://nbviewer.ipython.org/urls/dl.dropbox.com/s/41q9lgyqhy8ed5g/EM-Kmeans.ipynb
Caching and broadcasting both add values to memory of the worker nodes. Caching occurs for each RDD whereas broadcasting sends a value to each worker. Caching is used to store intermediate data in memory so it can be reused later (eg. training data), whereas broadcasting is used to store a shared variable in memory so it can be used by eack worker (eg. regtression weights).
For the KMeans implementation in Spark, I added a broadcast statement for the new centroids at each iteration. The nearest centroids function then accesses the broadcasted value of the centroids instead of the centroids being serialized and passed with each RDD.
In [3]:
%matplotlib inline
import numpy as np
import pylab
import json
def generateData():
size1 = size2 = size3 = 1000
samples1 = np.random.multivariate_normal([4, 0], [[1, 0],[0, 1]], size1)
data = samples1
samples2 = np.random.multivariate_normal([6, 6], [[1, 0],[0, 1]], size2)
data = np.append(data,samples2, axis=0)
samples3 = np.random.multivariate_normal([0, 4], [[1, 0],[0, 1]], size3)
data = np.append(data,samples3, axis=0)
# Randomlize data
data = data[np.random.permutation(size1+size2+size3),]
np.savetxt('data.csv',data,delimiter = ',')
return None
generateData()
#Calculate which class each data point belongs to
def nearest_centroid(line):
x = np.array([float(f) for f in line.split(',')])
## Use broadcast value of centroids
closest_centroid_idx = np.sum((x - centroidsBroadcast.value)**2, axis=1).argmin()
return (closest_centroid_idx,(x,1))
#plot centroids and data points for each iteration
def plot_iteration(means):
pylab.plot(samples1[:, 0], samples1[:, 1], '.', color = 'blue')
pylab.plot(samples2[:, 0], samples2[:, 1], '.', color = 'blue')
pylab.plot(samples3[:, 0], samples3[:, 1],'.', color = 'blue')
pylab.plot(means[0][0], means[0][1],'*',markersize =10,color = 'red')
pylab.plot(means[1][0], means[1][1],'*',markersize =10,color = 'red')
pylab.plot(means[2][0], means[2][1],'*',markersize =10,color = 'red')
pylab.show()
K = 3
# Initialization: initialization of parameter is fixed to show an example
centroids = np.array([[0.0,0.0],[2.0,2.0],[0.0,7.0]])
centroidsBroadcast = sc.broadcast(centroids)
D = sc.textFile("./data.csv").cache()
iter_num = 0
for i in range(10):
res = D.map(nearest_centroid).reduceByKey(lambda x,y : (x[0]+y[0],x[1]+y[1])).collect()
#res [(0, (array([ 2.66546663e+00, 3.94844436e+03]), 1001) ),
# (2, (array([ 6023.84995923, 5975.48511018]), 1000)),
# (1, (array([ 3986.85984761, 15.93153464]), 999))]
# res[1][1][1] returns 1000 here
res = sorted(res,key = lambda x : x[0]) #sort based on clusted ID
centroids_new = np.array([x[1][0]/x[1][1] for x in res]) #divide by cluster size
if np.sum(np.absolute(centroids_new-centroids))<0.01:
break
print "Iteration" + str(iter_num)
iter_num = iter_num + 1
centroids = centroids_new
## Boradcast current centroids
centroidsBroadcast = sc.broadcast(centroids)
print centroids
plot_iteration(centroids)
print "Final Results:"
print centroids
HW11.1 Loss Functions
In the context of binary classification problems, does the linear SVM learning algorithm yield the same result as a L2 penalized logistic regesssion learning algorithm?
In your reponse, please discuss the loss functions, and the learnt models, and separating surfaces between the two classes.
In the context of binary classification problems, does the linear SVM learning algorithm yield the same result as a perceptron learning algorithm?
[OPTIONAL]: generate an artifical binary classification dataset with 2 input features and plot the learnt separating surface for both a linear SVM and for logistic regression. Comment on the learnt surfaces. Please feel free to do this in Python (no need to use Spark).
L2 penalized logistic regression: $$J(w) = \sum_{i}(1+\exp(-y(w^Tx_i+b)))+\lambda||w||^2$$
Linear SVM:
$min\frac{1}{2}||w||^2$ subject to constraints $y_i(w \cdot x_i+b)-1\ge0$ $\forall i=1,...,L$
$$J(w) = \frac{1}{m}\sum_{i}(1-y_i(w^Tx_i-b))_++\frac{\lambda}{2}||w||^2$$
Perceptron learning algorithm:
$$J(W,X_1^L)=\sum_{\{X_i|y_i \langle W,X_i \rangle <0\}}(-W^TX_iy_i)$$
Linear SVM and L2 penalized logistic regression or perceptron learning algorithm do not yield the same results. SVM maximizes the margin between classes (minimizes weights) whereas logistic regression and perceptron learning find any solution that works.
HW11.2 Gradient descent
In the context of logistic regression describe and define three flavors of penalized loss functions. Are these all supported in Spark MLLib (include online references to support your answers)?
Descibe probabilitic interpretations of the L1 and L2 priors for penalized logistic regression (HINT: see synchronous slides for week 11 for details)
Lasso, L1: Penalizes based on the magnitude of the weights
Ridge, L2: Penalizes based on the squared magnitude of the weights
Both L1 and L2 regularization are suppoerted in MLLib.
The probabilistic interpretation of the L1 penalty is a Laplacian prior, whereas for L2 it is a Gaussian, therefore, the L1 regularization drives more weights to zero.
Zero-one loss: Counts the number of mistakes, equal to zero if classified correctly and equal to one if misclassified. Does not have a useful gradient.
$L_{01}(m) = 0$ if $m\ge0$
$L_{01}(m) = 1$ if $m<0$
Hinge loss: Loss term for soft-margin SVM
$L_{hinge}(m) = max(0,1-m)$
Log loss: Equivalent to cross-entropy loss used for logistic regression
$L(m) = log(1+e^{-m})$
Hinge loss and logistic loss are both supported in Spark MLLib for linear SVMs, but only logistic loss is supported for logistic regression.
Reference: http://spark.apache.org/docs/latest/mllib-linear-methods.html#logistic-regression
HW11.3 Logistic Regression
Generate 2 sets of linearly separable data with 100 data points each using the data generation code provided below and plot each in separate plots. Call one the training set and the other the testing set.
In [30]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
from numpy.random import rand
def generateData_sep(n):
"""
generates a 2D linearly separable dataset with n samples.
The third element of the sample is the label
"""
x1b = (rand(n)*2-1)/2-0.5
x2b = (rand(n)*2-1)/2+0.5
x1r = (rand(n)*2-1)/2+0.5
x2r = (rand(n)*2-1)/2-0.5
inputs = []
for i in range(len(x1b)):
inputs.append([x1b[i],x2b[i],1])
inputs.append([x1r[i],x2r[i],-1])
#print inputs
return inputs
def plot_separable_data():
n = 100
train = np.array(generateData_sep(n))
test = np.array(generateData_sep(n))
f, axarr = plt.subplots(1,2)
axarr[0].plot(train[train[:,2]==-1,0],train[train[:,2]==-1,1],'bo')
axarr[0].plot(train[train[:,2]==1,0],train[train[:,2]==1,1],'ro')
axarr[0].set_title('Training data')
axarr[0].set_xlabel('x1')
axarr[0].set_ylabel('x2')
axarr[1].plot(test[test[:,2]==-1,0],test[test[:,2]==-1,1],'bo')
axarr[1].plot(test[test[:,2]==1,0],test[test[:,2]==1,1],'ro')
axarr[1].set_title('Test data')
axarr[1].set_xlabel('x1')
axarr[1].set_ylabel('x2')
plot_separable_data()
Modify this data generation code to generating non-linearly separable training and testing datasets (with approximately 10% of the data falling on the wrong side of the separating hyperplane. Plot the resulting datasets.
NOTE: For the remainder of this problem please use the non-linearly separable training and testing datasets.
Using MLLib train up a LASSO logistic regression model with the training dataset and evaluate with the testing set. What a good number of iterations for training the logistic regression model? Justify with plots and words.
Derive and implement in Spark a weighted LASSO logistic regression. Implement a convergence test of your choice to check for termination within your training algorithm.
Weight the above training dataset as follows: Weight each example using the inverse vector length (Euclidean norm):
weight(X)= 1/||X||,
where ||X|| = SQRT(X.X)= SQRT(X1^2 + X2^2)
Here X is vector made up of X1 and X2.
Evaluate your homegrown weighted LASSO logistic regression on the test dataset. Report misclassification error (1 - Accuracy) and how many iterations does it took to converge.
Does Spark MLLib have a weighted LASSO logistic regression implementation. If so use it and report your findings on the weighted training set and test set.
In [69]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
from numpy.random import rand
'''
Modify this data generation code to generating non-linearly separable
training and testing datasets (with approximately 10% of the data
falling on the wrong side of the separating hyperplane.
Plot the resulting datasets.
'''
def generateData(n, isSeperable):
"""
generates a 2D non-separable dataset with n samples.
The third element of the sample is the label
"""
x1b = (rand(n)*2-1)/2-0.5
x2b = (rand(n)*2-1)/2+0.5
x1r = (rand(n)*2-1)/2+0.5
x2r = (rand(n)*2-1)/2-0.5
inputs = []
for i in range(len(x1b)):
inputs.append([x1b[i],x2b[i],1])
inputs.append([x1r[i],x2r[i],0])
inputs = np.array(inputs)
if not isSeperable:
wrongidx = np.random.choice(n,0.1*n, replace=False)
inputs[wrongidx,2]=(1-inputs[wrongidx,2])**2
return inputs
def plot_nonseparable_data():
n = 100
train = generateData(n, isSeperable=False)
test = generateData(n, isSeperable=False)
np.savetxt('train.csv',train,delimiter = ',')
np.savetxt('test.csv',test,delimiter = ',')
f, axarr = plt.subplots(1,2)
axarr[0].plot(train[train[:,2]==0,0],train[train[:,2]==0,1],'bo')
axarr[0].plot(train[train[:,2]==1,0],train[train[:,2]==1,1],'ro')
axarr[0].set_title('Training data')
axarr[0].set_xlabel('x1')
axarr[0].set_ylabel('x2')
axarr[1].plot(test[test[:,2]==0,0],test[test[:,2]==0,1],'bo')
axarr[1].plot(test[test[:,2]==1,0],test[test[:,2]==1,1],'ro')
axarr[1].set_title('Test data')
axarr[1].set_xlabel('x1')
axarr[1].set_ylabel('x2')
return None
plot_nonseparable_data()
In [51]:
!head train.csv
In [70]:
'''
Using MLLib train a L1 logistic regression model with the training dataset
and evaluate with the testing set.
What is a good number of iterations for training the logistic regression model?
Justify with plots and words.
'''
from pyspark.mllib.classification import LogisticRegressionWithSGD, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
def parsePoint(line):
values = [float(x) for x in line.split(',')]
return LabeledPoint(values[2], values[:2])
traindatalines = sc.textFile("train.csv")
traindata = traindatalines.map(parsePoint)
#print traindata.collect()
testdatalines = sc.textFile("train.csv")
testdata = testdatalines.map(parsePoint)
# Build the model
model = LogisticRegressionWithSGD.train(traindata, iterations=10, regType="l1")
# Evaluating the model on training data
labelsAndPreds = testdata.map(lambda p: (p.label, model.predict(p.features)))
testErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(testdata.count())
print("Testing Error = " + str(testErr))
In [ ]:
'''
Derive and implement in Spark a weighted LASSO logistic regression.
Implement a convergence test of your choice to check for termination
within your training algorithm.
Weight the above training dataset as follows:
Weight each example using the inverse vector length (Euclidean norm):
weight(X)= 1/||X||,
where ||X|| = SQRT(X.X)= SQRT(X1^2 + X2^2)
Here X is vector made up of X1 and X2.
Evaluate your homegrown weighted LASSO logistic regression on the test dataset.
Report misclassification error (1 - Accuracy) and
how many iterations does it take to converge.'''
def logisticRegressionGDReg(data, wInitial=None, learningRate=0.05, iterations=50, regParam=0.01, regType="Lasso"):
featureLen = len(data.take(1)[0].x)
n = data.count()
if wInitial is None:
w = np.random.normal(size=featureLen) # w should be broadcasted if it is large
else:
w = wInitial
for i in range(iterations):
wBroadcast = sc.broadcast(w)
gradient = data.map(lambda p: (1 / (1 + np.exp(-p.y*np.dot(wBroadcast.value, p.x)))-1) * p.y * np.array(p.x))\
.reduce(lambda a, b: a + b)
if regType == "Ridge":
wReg = w * 1
wReg[-1] = 0 #last value of weight vector is bias term, ignored in regularization
elif regType == "Lasso":
wReg = w * 1
wReg[-1] = 0 #last value of weight vector is bias term, ignored in regularization
wReg = (wReg>0).astype(int) * 2-1
else:
wReg = np.zeros(w.shape[0])
gradient = gradient + regParam * wReg #gradient: GD of Sqaured Error+ GD of regularized term
w = w - learningRate * gradient / n
return w
HW11.4 SVMs Use the non-linearly separable training and testing datasets from HW11.3 in this problem.
Using MLLib train up a soft SVM model with the training dataset and evaluate with the testing set. What is a good number of iterations for training the SVM model? Justify with plots and words.
Derive and Implement in Spark a weighted soft linear svm classification learning algorithm.
Evaluate your homegrown weighted soft linear svm classification learning algorithm on the weighted training dataset and test dataset from HW11.3. Report misclassification error (1 - Accuracy) and how many iterations does it took to converge? How many support vectors do you end up?
Does Spark MLLib have a weighted soft SVM learner. If so use it and report your findings on the weighted training set and test set.
In [ ]:
11.5 [OPTIONAL] Distributed Perceptron algorithm.
Using the following papers as background:
http://static.googleusercontent.com/external_content/untrusted_dlcp/research.google.com/en//pubs/archive/36266.pdf
https://www.dropbox.com/s/a5pdcp0r8ptudgj/gesmundo-tomeh-eacl-2012.pdf?dl=0
http://www.slideshare.net/matsubaray/distributed-perceptron
Implement each of the following flavors of perceptron learning algorithm:
11.6 [OPTIONAL: consider doing this in a group] Evalution perceptron algorihtms on PennTreeBank POS corpus
Reproduce the experiments reported in the following paper:
HadoopPerceptron: a Toolkit for Distributed Perceptron Training and Prediction with MapReduce
Andrea Gesmundo and Nadi Tomeh
http://www.aclweb.org/anthology/E12-2020
These experiments focus on the prediction accuracy on a part-of-speech (POS) task using the PennTreeBank corpus. They use sections 0-18 of the WallStreet Journal for training, and sections 22-24 for testing.
HW11.7 [OPTIONAL: consider doing this in a group] Kernal Adatron
Implement the Kernal Adatron in Spark (contact Jimi for details)
In [ ]: