Start Spark
In [1]:
%cd ~/Documents/W261/hw13/
In [1]:
import os
import sys
spark_home = os.environ['SPARK_HOME'] = \
'/Users/davidadams/packages/spark-1.5.1-bin-hadoop2.6/'
if not spark_home:
raise ValueError('SPARK_HOME enviroment variable is not set')
sys.path.insert(0,os.path.join(spark_home,'python'))
sys.path.insert(0,os.path.join(spark_home,'python/lib/py4j-0.8.2.1-src.zip'))
execfile(os.path.join(spark_home,'python/pyspark/shell.py'))
HW 13.1: Spark implementation of basic PageRank
Write a basic Spark implementation of the iterative PageRank algorithm
that takes sparse adjacency lists as input.
Make sure that your implementation utilizes teleportation (1-damping/the number of nodes in the network), and further, distributes the mass of dangling nodes with each iteration so that the output of each iteration is correctly normalized (sums to 1).
[NOTE: The PageRank algorithm assumes that a random surfer (walker), starting from a random web page, chooses the next page to which it will move by clicking at random, with probability d, one of the hyperlinks in the current page. This probability is represented by a so-called ‘damping factor’ d, where d ∈ (0, 1). Otherwise, with probability (1 − d), the surfer jumps to any web page in the network. If a page is a dangling end, meaning it has no outgoing hyperlinks, the random surfer selects an arbitrary web page from a uniform distribution and “teleports” to that page]
In your Spark solution, please use broadcast variables and caching to make sure your code is as efficient as possible.
As you build your code, use the following test data to check your implementation:
s3://ucb-mids-mls-networks/PageRank-test.txt
Set the teleportation parameter to 0.15 (1-d, where d, the damping factor is set to 0.85), and crosscheck your work with the true result, displayed in the first image in the Wikipedia article:
https://en.wikipedia.org/wiki/PageRank
and here for reference are the corresponding resulting PageRank probabilities:
A,0.033
B,0.384
C,0.343
D,0.039
E,0.081
F,0.039
G,0.016
H,0.016
I,0.016
J,0.016
K,0.016
Run this experiment locally first. Report the local configuration that you used and how long in minutes and seconds it takes to complete your job.
Repeat this experiment on AWS. Report the AWS cluster configuration that you used and how long in minutes and seconds it takes to complete your job. (in your notebook, cat the cluster config file)
In [61]:
from ast import literal_eval
from operator import add
from pprint import pprint
from time import time
def sendWeights(pair):
wt = pair[1][0]
if pair[1][1]==None:
return [('*',wt)]
destnodes = pair[1][1].keys()
sendweights = []
numDestNodes = len(destnodes)
for n in destnodes:
sendweights.append((n,1.0*wt/numDestNodes))
return sendweights
def collectNewGraph(pair):
G = numNodesBroadcast.value
M = danglingMassBroadcast.value
#M = 0.5
alpha = 0.15
key = pair[0]
PR = pair[1][0]
if PR==None:
PR=0
(old_pr, edges) = pair[1][1]
PR_adj = alpha*(1.0/G)+(1.0-alpha)*(1.0*M/G+PR)
return (key, (PR_adj, edges))
starttime = time()
graphfilename = "./PageRank-test.txt"
graph_input = sc.textFile(graphfilename).map(lambda x: x.split('\t')).map(lambda pair: (pair[0],literal_eval(pair[1])))#.cache()
numPartitions = graph_input.getNumPartitions()
nodes1 = graph_input.map(lambda pair: (pair[0],1))
nodes2 = graph_input.flatMap(lambda pair: pair[1].keys()).map(lambda n: (n,1))
allNodes = nodes1.union(nodes2).map(lambda p: (p[0],None)).distinct()
numNodes = allNodes.count()
numNodesBroadcast = sc.broadcast(numNodes)
graph = allNodes.leftOuterJoin(graph_input, numPartitions=numPartitions).map(lambda pair: (pair[0],(1.0/numNodesBroadcast.value,pair[1][1])))#.cache()
numIters = 50
for i in range(numIters):
print 'iteration:',i+1
PR = graph.flatMap(sendWeights).reduceByKey(lambda x,y: x+y).cache()
danglingMass = PR.filter(lambda pair: pair[0]=='*').collect()[0][1]
danglingMassBroadcast = sc.broadcast(danglingMass)
graph = PR.rightOuterJoin(graph, numPartitions=numPartitions).map(collectNewGraph)
print '\n\n'
pprint(graph.collect())
runtime = time() - starttime
print("elapsed time: %.4f minutes, %.6f seconds" % (int(runtime/60), runtime-int(runtime/60)))
In [119]:
correctweights = {'A':0.033, 'B':0.384, 'C':0.343 , 'D':0.039, 'E':0.081, 'F':0.039,
'G':0.016, 'H':0.016, 'I':0.016, 'J':0.016, 'K':0.016}
pprint(correctweights)
In [149]:
%%writefile pagerank.py
import sys
from pyspark import SparkContext
from ast import literal_eval
from operator import add
from pprint import pprint
from time import time
def sendWeights(pair):
wt = pair[1][0]
if pair[1][1]==None:
return [('*',wt)]
destnodes = pair[1][1].keys()
sendweights = []
numDestNodes = len(destnodes)
for n in destnodes:
sendweights.append((n,1.0*wt/numDestNodes))
return sendweights
def collectNewGraph(pair):
G = numNodesBroadcast.value
M = danglingMassBroadcast.value
alpha = 0.15
key = pair[0]
PR = pair[1][0]
if PR==None:
PR=0
(old_pr, edges) = pair[1][1]
PR_adj = alpha*(1.0/G)+(1.0-alpha)*(1.0*M/G+PR)
return (key, (PR_adj, edges))
if __name__=="__main__":
sc = SparkContext(appName="PageRank")
graphfilename = sys.argv[1]
graph_input = sc.textFile(graphfilename).map(lambda x: x.split('\t')).map(lambda pair: (pair[0],literal_eval(pair[1])))#.cache()
numPartitions = graph_input.getNumPartitions()
numPartitionsBroadcast = sc.broadcast(numPartitions)
nodes1 = graph_input.map(lambda pair: (pair[0],1))
nodes2 = graph_input.flatMap(lambda pair: pair[1].keys()).map(lambda n: (n,1))
allNodes = nodes1.union(nodes2).map(lambda p: (p[0],None)).distinct()
numNodes = allNodes.count()
numNodesBroadcast = sc.broadcast(numNodes)
graph = allNodes.leftOuterJoin(graph_input, numPartitions=numPartitionsBroadcast.value).map(lambda pair: (pair[0],(1.0/numNodesBroadcast.value,pair[1][1])))#.cache()
numIters = 40
for i in range(numIters):
PR = graph.flatMap(sendWeights).reduceByKey(lambda x,y: x+y).cache()
danglingMass = PR.filter(lambda pair: pair[0]=='*').collect()[0][1]
danglingMassBroadcast = sc.broadcast(danglingMass)
graph = PR.rightOuterJoin(graph, numPartitions=numPartitionsBroadcast.value).map(collectNewGraph)
graph.saveAsTextFile(sys.argv[2])
sc.stop()
In [150]:
#in console
#time ./bin/spark-submit /Users/davidadams/Documents/W261/hw13/pagerank.py /Users/davidadams/Documents/W261/hw13/PageRank-test.txt /Users/davidadams/Documents/W261/hw13/PageRank-test_result
# real 0m18.105s
# user 0m45.240s
# sys 0m4.072s
!cat PageRank-test_result_local/*
In [152]:
#on EMR
#time ~/spark/bin/spark-submit --master yarn-cluster ./pagerank.py s3n://ucb-mids-mls-katieadams/PageRank-test.txt s3://ucb-mids-mls-katieadams/PageRank-test_result
# real 0m53.101s
# user 0m8.532s
# sys 0m1.184s
!cat PageRank-test_result_emr/*
HW 13.2: Applying PageRank to the Wikipedia hyperlinks network
Run your Spark PageRank implementation on the Wikipedia dataset for 10 iterations, and display the top 100 ranked nodes (with alpha = 0.85).
Run your PageRank implementation on the Wikipedia dataset for 50 iterations, and display the top 100 ranked nodes (with teleportation factor of 0.15).
Plot the pagerank values for the top 100 pages resulting from the 50 iterations run. Then plot the pagerank values for the same 100 pages that resulted from the 10 iterations run. Comment on your findings. Have the top 100 ranked pages changed? Have the pagerank values changed? Explain.
Report the AWS cluster configuration that you used and how long in minutes and seconds it takes to complete your job.
NOTE: Wikipedia data is located on S3 at
s3://ucb-mids-mls-networks/wikipedia/
-- s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-out.txt # Graph
-- s3://ucb-mids-mls-networks/wikipedia/indices.txt # Page titles and page Ids
In [ ]:
#on EMR
#time ~/spark/bin/spark-submit --master yarn-cluster ./pagerank.py s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-out.txt s3://ucb-mids-mls-katieadams/sparktest/pr-wiki_results
# Didn't finish running before submitting (cluster of 10 m3.xlarge instances)
HW 13.3: Spark GraphX versus your implementation of PageRank
Run the Spark GraphX PageRank implementation on the Wikipedia dataset for 10 iterations, and display the top 100 ranked nodes (with alpha = 0.85).
Run your PageRank implementation on the Wikipedia dataset for 50 iterations, and display the top 100 ranked nodes (with teleportation factor of 0.15).
Have the top 100 ranked pages changed? Comment on your findings. Plot both 100 curves.
Report the AWS cluster configuration that you used and how long in minutes and seconds it takes to complete this job.
Put the runtime results of HW13.2 and HW13.3 in a tabular format (with rows corresponding to implemention and columns corresponding to experiment setup (10 iterations, 50 iterations)). Discuss the run times and explaing the differences.
Plot the pagerank values for the top 100 pages resulting from the 50 iterations run (using GraphX). Then plot the pagerank values for the same 100 pages that resulted from the 50 iterations run of your homegrown pagerank implemnentation. Comment on your findings. Have the top 100 ranked pages changed? Have the pagerank values changed? Explain.
In [ ]:
HW 13.4: Criteo Phase 2 baseline
The Criteo data is located in the following S3 bucket: criteo-dataset
https://console.aws.amazon.com/s3/home?region=us-west-1#&bucket=criteo-dataset&prefix=
Using the training dataset, validation dataset and testing dataset in the Criteo bucket perform the following experiment:
-- write spark code (borrow from Phase 1 of this project) to train a logistic regression model with the following hyperparamters:
-- Number of buckets for hashing: 1,000
-- Logistic Regression: no regularization term
-- Logistic Regression: step size = 10
Report the AWS cluster configuration that you used and how long in minutes and seconds it takes to complete this job.
Report in tabular form the AUC value (https://en.wikipedia.org/wiki/Receiver_operating_characteristic) for the Training, Validation, and Testing datasets.
Report in tabular form the logLossTest for the Training, Validation, and Testing datasets.
Dont forget to put a caption on your tables (above each table).
In [133]:
%%writefile ctr_logreg.py
import sys
from pyspark import SparkContext
from collections import defaultdict
import hashlib
from math import log, exp
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.evaluation import BinaryClassificationMetrics
def hashFunction(numBuckets, rawFeats, printMapping=False):
"""Calculate a feature dictionary for an observation's features based on hashing.
Note:
Use printMapping=True for debug purposes and to better understand how the hashing works.
Args:
numBuckets (int): Number of buckets to use as features.
rawFeats (list of (int, str)): A list of features for an observation. Represented as
(featureID, value) tuples.
printMapping (bool, optional): If true, the mappings of featureString to index will be
printed.
Returns:
dict of int to float: The keys will be integers which represent the buckets that the
features have been hashed to. The value for a given key will contain the count of the
(featureID, value) tuples that have hashed to that key.
"""
mapping = {}
for ind, category in rawFeats:
featureString = category + str(ind)
mapping[featureString] = int(int(hashlib.md5(featureString).hexdigest(), 16) % numBuckets)
if(printMapping): print mapping
sparseFeatures = defaultdict(float)
for bucket in mapping.values():
sparseFeatures[bucket] += 1.0
return dict(sparseFeatures)
def parseHashPoint(point, numBuckets):
"""Create a LabeledPoint for this observation using hashing.
Args:
point (str): A \t separated string where the first value is the label and the rest are
features.
numBuckets: The number of buckets to hash to.
Returns:
LabeledPoint: A LabeledPoint with a label (0.0 or 1.0) and a SparseVector of hashed
features.
"""
feats = point.split('\t')
featlist = []
for i,feat in enumerate(feats):
if i==0:
label=float(feat)
else:
featlist.append((i-1,feat))
hashSparseVector = SparseVector(numBuckets, hashFunction(numBuckets, featlist, printMapping=False))
return LabeledPoint(label, hashSparseVector)
def getP(x, w, intercept):
"""Calculate the probability for an observation given a set of weights and intercept.
Note:
We'll bound our raw prediction between 20 and -20 for numerical purposes.
Args:
x (SparseVector): A vector with values of 1.0 for features that exist in this
observation and 0.0 otherwise.
w (DenseVector): A vector of weights (betas) for the model.
intercept (float): The model's intercept.
Returns:
float: A probability between 0 and 1.
"""
rawPrediction = x.dot(w)+intercept
# Bound the raw prediction value
rawPrediction = min(rawPrediction, 20)
rawPrediction = max(rawPrediction, -20)
return 1.0/(1.0+exp(-1.0*rawPrediction))
def computeLogLoss(p, y):
"""Calculates the value of log loss for a given probabilty and label.
Note:
log(0) is undefined, so when p is 0 we need to add a small value (epsilon) to it
and when p is 1 we need to subtract a small value (epsilon) from it.
Args:
p (float): A probabilty between 0 and 1.
y (int): A label. Takes on the values 0 and 1.
Returns:
float: The log loss value.
"""
epsilon = 10e-12
if p==0:
p = p+epsilon
elif p==1:
p = p-epsilon
if y==1:
return -1.0*log(p)
elif y==0:
return -1.0*log(1-p)
else:
return None
def evaluateResults(model, data):
"""Calculates the log loss for the data given the model.
Args:
model (LogisticRegressionModel): A trained logistic regression model.
data (RDD of LabeledPoint): Labels and features for each observation.
Returns:
float: Log loss for the data.
"""
return data.map(lambda point: computeLogLoss(getP(point.features, model.weights, model.intercept), point.label)).mean()
if __name__=="__main__":
sc = SparkContext(appName="CTR2")
rawData = sc.textFile(sys.argv[1])
weights = [.8, .1, .1]
seed = 42
# Use randomSplit with weights and seed
rawTrainData, rawValidationData, rawTestData = rawData.randomSplit([.8, .1, .1], seed=42)
# Cache the data
rawTrainData.cache()
rawValidationData.cache()
rawTestData.cache()
numBucketsCTR = 1000
hashTrainData = rawTrainData.map(lambda point: parseHashPoint(point, numBucketsCTR))
hashTrainData.cache()
hashValidationData = rawValidationData.map(lambda point: parseHashPoint(point, numBucketsCTR))
hashValidationData.cache()
hashTestData = rawTestData.map(lambda point: parseHashPoint(point, numBucketsCTR))
hashTestData.cache()
numIters = 15
regType = None
includeIntercept = True
stepSize = 10.0
model = LogisticRegressionWithSGD.train(hashTrainData, numIters, stepSize, regType=regType, intercept=includeIntercept)
logLossTrain = evaluateResults(model, hashTrainData)
logLossVal = evaluateResults(model, hashValidationData)
logLossTest = evaluateResults(model, hashTestData)
labelsAndScoresTrain = hashTrainData.map(lambda lp:
(lp.label, getP(lp.features, model.weights, model.intercept)))
metricsTrain = BinaryClassificationMetrics(labelsAndScoresTrain)
labelsAndScoresVal = hashValidationData.map(lambda lp:
(lp.label, getP(lp.features, model.weights, model.intercept)))
metricsVal = BinaryClassificationMetrics(labelsAndScoresVal)
labelsAndScoresTest = hashTestData.map(lambda lp:
(lp.label, getP(lp.features, model.weights, model.intercept)))
metricsTest = BinaryClassificationMetrics(labelsAndScoresTest)
results_list = [('train',logLossTrain, metricsTrain.areaUnderROC), ('val',logLossVal, metricsVal.areaUnderROC), ('test',logLossTest, metricsTest.areaUnderROC)]
results_rdd = sc.parallelize(results_list)
results_rdd.saveAsTextFile(sys.argv[2])
sc.stop()
In [110]:
# test locally
#time ./bin/spark-submit /Users/davidadams/Documents/W261/hw13/ctr_logreg.py /Users/davidadams/Documents/W261/hw13/dac_sample.txt /Users/davidadams/Documents/W261/hw13/CTR-test-result.txt
# run on EMR
# Cluster: emr-4.2.0, 15 m3.xlarge instances, maximizeResourceAllocation=True
#time ~/spark/bin/spark-submit --master yarn-cluster ./ctr_logreg.py s3://criteo-dataset/train.txt s3://ucb-mids-mls-katieadams/ctr_results_new
# real 12m54.512s
# user 0m8.600s
# sys 0m1.128s
In [144]:
from ast import literal_eval
def makeTable(filename):
resultsfile = open(filename, 'r')
print "Criteo CTR Logistic Regression Results Table"
print " (Parameters: 15 iterations, step size = 10, no regularization)"
print "\nData\t\tLogLoss\tAUC"
print "-------------------------------"
for line in resultsfile.readlines():
line_tup = literal_eval(line)
data = line_tup[0]
logloss = line_tup[1]
auc = line_tup[2]
if data=='train':
print("Train\t\t%.4f\t%.4f" % (logloss,auc))
elif data=='test':
print("Test\t\t%.4f\t%.4f" % (logloss,auc))
else:
print("Validation\t%.4f\t%.4f" % (logloss,auc))
filename = 'CTR_results.txt'
makeTable(filename)
HW 13.5: Criteo Phase 2 hyperparameter tuning
Using the training dataset, validation dataset and testing dataset in the Criteo bucket perform the following experiments:
-- write spark code (borrow from Phase 1 of this project) to train a logistic regression model with various hyperparamters. Do a gridsearch of the hyperparameter space and determine optimal settings using the validation set.
-- Number of buckets for hashing: 1,000, 10,000, .... explore different values here
-- Logistic Regression: regularization term: [1e-6, 1e-3] explore other values here also
-- Logistic Regression: step size: explore different step sizes. Focus on a stepsize of 1 initially.
Report the AWS cluster configuration that you used and how long in minutes and seconds it takes to complete this job.
Report in tabular form and using heatmaps the AUC values (https://en.wikipedia.org/wiki/Receiver_operating_characteristic) for the Training, Validation, and Testing datasets.
Report in tabular form and using heatmaps the logLossTest for the Training, Validation, and Testing datasets.
Dont forget to put a caption on your tables (above the table) and on your heatmap figures (put caption below figures) detailing the experiment associated with each table or figure (data, algorithm used, parameters and settings explored.
Discuss the optimal setting to solve this problem in terms of the following:
-- Features
-- Learning algortihm
-- Spark cluster
Justiy your recommendations based on your experimental results and cross reference with table numbers and figure numbers. Also highlight key results with annotations, both textual and line and box based, on your tables and graphs.
In [ ]: