DATASCI W261: Machine Learning at Scale

Week 14, Homework 13

Katrina Adams

kradams@ischool.berkeley.edu
9 December 2015


Start Spark


In [1]:
%cd ~/Documents/W261/hw13/


/Users/davidadams/Documents/W261/hw13

In [1]:
import os
import sys

spark_home = os.environ['SPARK_HOME'] = \
   '/Users/davidadams/packages/spark-1.5.1-bin-hadoop2.6/'

if not spark_home:
    raise ValueError('SPARK_HOME enviroment variable is not set')
sys.path.insert(0,os.path.join(spark_home,'python'))
sys.path.insert(0,os.path.join(spark_home,'python/lib/py4j-0.8.2.1-src.zip'))
execfile(os.path.join(spark_home,'python/pyspark/shell.py'))


Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.5.1
      /_/

Using Python version 2.7.6 (default, Sep  9 2014 15:04:36)
SparkContext available as sc, HiveContext available as sqlContext.

HW 13.1: Spark implementation of basic PageRank

Write a basic Spark implementation of the iterative PageRank algorithm that takes sparse adjacency lists as input.
Make sure that your implementation utilizes teleportation (1-damping/the number of nodes in the network), and further, distributes the mass of dangling nodes with each iteration so that the output of each iteration is correctly normalized (sums to 1).
[NOTE: The PageRank algorithm assumes that a random surfer (walker), starting from a random web page, chooses the next page to which it will move by clicking at random, with probability d, one of the hyperlinks in the current page. This probability is represented by a so-called ‘damping factor’ d, where d ∈ (0, 1). Otherwise, with probability (1 − d), the surfer jumps to any web page in the network. If a page is a dangling end, meaning it has no outgoing hyperlinks, the random surfer selects an arbitrary web page from a uniform distribution and “teleports” to that page]

In your Spark solution, please use broadcast variables and caching to make sure your code is as efficient as possible.

As you build your code, use the following test data to check your implementation:

s3://ucb-mids-mls-networks/PageRank-test.txt

Set the teleportation parameter to 0.15 (1-d, where d, the damping factor is set to 0.85), and crosscheck your work with the true result, displayed in the first image in the Wikipedia article:

https://en.wikipedia.org/wiki/PageRank

and here for reference are the corresponding resulting PageRank probabilities:

A,0.033
B,0.384
C,0.343
D,0.039
E,0.081
F,0.039
G,0.016
H,0.016
I,0.016
J,0.016
K,0.016

Run this experiment locally first. Report the local configuration that you used and how long in minutes and seconds it takes to complete your job.

Repeat this experiment on AWS. Report the AWS cluster configuration that you used and how long in minutes and seconds it takes to complete your job. (in your notebook, cat the cluster config file)


In [61]:
from ast import literal_eval
from operator import add
from pprint import pprint
from time import time


def sendWeights(pair):
    wt = pair[1][0]
    if pair[1][1]==None:
        return [('*',wt)]
    destnodes = pair[1][1].keys()
    sendweights = []
    numDestNodes = len(destnodes)
    for n in destnodes:
        sendweights.append((n,1.0*wt/numDestNodes))
    return sendweights



def collectNewGraph(pair):

    G = numNodesBroadcast.value
    M = danglingMassBroadcast.value
    #M = 0.5
    alpha = 0.15
    key = pair[0]
    PR = pair[1][0]
    if PR==None:
        PR=0

    (old_pr, edges) = pair[1][1]
    
    PR_adj = alpha*(1.0/G)+(1.0-alpha)*(1.0*M/G+PR) 
    return (key, (PR_adj, edges))

starttime = time()

graphfilename = "./PageRank-test.txt"
graph_input = sc.textFile(graphfilename).map(lambda x: x.split('\t')).map(lambda pair: (pair[0],literal_eval(pair[1])))#.cache()
numPartitions = graph_input.getNumPartitions()


nodes1 = graph_input.map(lambda pair: (pair[0],1))
nodes2 = graph_input.flatMap(lambda pair: pair[1].keys()).map(lambda n: (n,1))
allNodes = nodes1.union(nodes2).map(lambda p: (p[0],None)).distinct()
numNodes = allNodes.count()

numNodesBroadcast = sc.broadcast(numNodes)

graph = allNodes.leftOuterJoin(graph_input, numPartitions=numPartitions).map(lambda pair: (pair[0],(1.0/numNodesBroadcast.value,pair[1][1])))#.cache()


numIters = 50
for i in range(numIters):
    print 'iteration:',i+1

    PR = graph.flatMap(sendWeights).reduceByKey(lambda x,y: x+y).cache()

    danglingMass = PR.filter(lambda pair: pair[0]=='*').collect()[0][1]
    danglingMassBroadcast = sc.broadcast(danglingMass)

    graph = PR.rightOuterJoin(graph, numPartitions=numPartitions).map(collectNewGraph)


print '\n\n' 
pprint(graph.collect())

runtime = time() - starttime
print("elapsed time: %.4f minutes, %.6f seconds" % (int(runtime/60), runtime-int(runtime/60)))


iteration: 1
iteration: 2
iteration: 3
iteration: 4
iteration: 5
iteration: 6
iteration: 7
iteration: 8
iteration: 9
iteration: 10
iteration: 11
iteration: 12
iteration: 13
iteration: 14
iteration: 15
iteration: 16
iteration: 17
iteration: 18
iteration: 19
iteration: 20
iteration: 21
iteration: 22
iteration: 23
iteration: 24
iteration: 25
iteration: 26
iteration: 27
iteration: 28
iteration: 29
iteration: 30
iteration: 31
iteration: 32
iteration: 33
iteration: 34
iteration: 35
iteration: 36
iteration: 37
iteration: 38
iteration: 39
iteration: 40
iteration: 41
iteration: 42
iteration: 43
iteration: 44
iteration: 45
iteration: 46
iteration: 47
iteration: 48
iteration: 49
iteration: 50



[('A', (0.03278149315934767, None)),
 ('C', (0.3429414533690357, {'B': 1})),
 ('E', (0.08088569323450433, {'B': 1, 'D': 1, 'F': 1})),
 (u'G', (0.01616947901685893, {'B': 1, 'E': 1})),
 (u'I', (0.01616947901685893, {'B': 1, 'E': 1})),
 (u'K', (0.01616947901685893, {'E': 1})),
 (u'J', (0.01616947901685893, {'E': 1})),
 ('D', (0.03908709209997012, {'A': 1, 'B': 1})),
 ('F', (0.03908709209997012, {'B': 1, 'E': 1})),
 (u'H', (0.01616947901685893, {'B': 1, 'E': 1})),
 ('B', (0.3843697809528768, {'C': 1}))]
elapsed time: 0.0000 minutes, 10.095697 seconds

In [119]:
correctweights = {'A':0.033, 'B':0.384, 'C':0.343 , 'D':0.039, 'E':0.081, 'F':0.039, 
                   'G':0.016, 'H':0.016, 'I':0.016, 'J':0.016, 'K':0.016}

pprint(correctweights)


{'A': 0.033,
 'B': 0.384,
 'C': 0.343,
 'D': 0.039,
 'E': 0.081,
 'F': 0.039,
 'G': 0.016,
 'H': 0.016,
 'I': 0.016,
 'J': 0.016,
 'K': 0.016}

In [149]:
%%writefile pagerank.py
import sys
from pyspark import SparkContext

from ast import literal_eval
from operator import add
from pprint import pprint
from time import time


def sendWeights(pair):
    wt = pair[1][0]
    if pair[1][1]==None:
        return [('*',wt)]
    destnodes = pair[1][1].keys()
    sendweights = []
    numDestNodes = len(destnodes)
    for n in destnodes:
        sendweights.append((n,1.0*wt/numDestNodes))
    return sendweights


def collectNewGraph(pair):

    G = numNodesBroadcast.value
    M = danglingMassBroadcast.value
    alpha = 0.15
    key = pair[0]
    PR = pair[1][0]
    if PR==None:
        PR=0

    (old_pr, edges) = pair[1][1]
    
    PR_adj = alpha*(1.0/G)+(1.0-alpha)*(1.0*M/G+PR) 
    return (key, (PR_adj, edges))


if __name__=="__main__":
        
    sc = SparkContext(appName="PageRank")

    graphfilename = sys.argv[1]
    graph_input = sc.textFile(graphfilename).map(lambda x: x.split('\t')).map(lambda pair: (pair[0],literal_eval(pair[1])))#.cache()
    numPartitions = graph_input.getNumPartitions()
    numPartitionsBroadcast = sc.broadcast(numPartitions)


    nodes1 = graph_input.map(lambda pair: (pair[0],1))
    nodes2 = graph_input.flatMap(lambda pair: pair[1].keys()).map(lambda n: (n,1))
    allNodes = nodes1.union(nodes2).map(lambda p: (p[0],None)).distinct()
    numNodes = allNodes.count()

    numNodesBroadcast = sc.broadcast(numNodes)

    graph = allNodes.leftOuterJoin(graph_input, numPartitions=numPartitionsBroadcast.value).map(lambda pair: (pair[0],(1.0/numNodesBroadcast.value,pair[1][1])))#.cache()

    numIters = 40
    for i in range(numIters):

        PR = graph.flatMap(sendWeights).reduceByKey(lambda x,y: x+y).cache()

        danglingMass = PR.filter(lambda pair: pair[0]=='*').collect()[0][1]
        danglingMassBroadcast = sc.broadcast(danglingMass)

        graph = PR.rightOuterJoin(graph, numPartitions=numPartitionsBroadcast.value).map(collectNewGraph)

    
    graph.saveAsTextFile(sys.argv[2])
    sc.stop()


Overwriting pagerank.py

In [150]:
#in console
#time ./bin/spark-submit  /Users/davidadams/Documents/W261/hw13/pagerank.py /Users/davidadams/Documents/W261/hw13/PageRank-test.txt /Users/davidadams/Documents/W261/hw13/PageRank-test_result
# real	0m18.105s
# user	0m45.240s
# sys	0m4.072s

!cat PageRank-test_result_local/*


('A', (0.03278149316111876, None))
('C', (0.3430685989242862, {'B': 1}))
('E', (0.08088569323767447, {'B': 1, 'D': 1, 'F': 1}))
(u'G', (0.016169479017118762, {'B': 1, 'E': 1}))
(u'I', (0.016169479017118762, {'B': 1, 'E': 1}))
(u'K', (0.016169479017118762, {'E': 1}))
(u'J', (0.016169479017118762, {'E': 1}))
('D', (0.03908709210193935, {'A': 1, 'B': 1}))
('F', (0.03908709210193935, {'B': 1, 'E': 1}))
(u'H', (0.016169479017118762, {'B': 1, 'E': 1}))
('B', (0.3842426353874476, {'C': 1}))

In [152]:
#on EMR
#time ~/spark/bin/spark-submit --master yarn-cluster ./pagerank.py s3n://ucb-mids-mls-katieadams/PageRank-test.txt s3://ucb-mids-mls-katieadams/PageRank-test_result
# real	0m53.101s
# user	0m8.532s
# sys	0m1.184s

!cat PageRank-test_result_emr/*


('A', (0.032781493161118759, None))
('C', (0.34306859892428621, {'B': 1}))
('E', (0.080885693237674472, {'B': 1, 'D': 1, 'F': 1}))
(u'G', (0.016169479017118762, {'B': 1, 'E': 1}))
(u'I', (0.016169479017118762, {'B': 1, 'E': 1}))
(u'K', (0.016169479017118762, {'E': 1}))
(u'J', (0.016169479017118762, {'E': 1}))
('D', (0.03908709210193935, {'A': 1, 'B': 1}))
('F', (0.03908709210193935, {'B': 1, 'E': 1}))
(u'H', (0.016169479017118762, {'B': 1, 'E': 1}))
('B', (0.38424263538744757, {'C': 1}))

HW 13.2: Applying PageRank to the Wikipedia hyperlinks network

Run your Spark PageRank implementation on the Wikipedia dataset for 10 iterations, and display the top 100 ranked nodes (with alpha = 0.85).

Run your PageRank implementation on the Wikipedia dataset for 50 iterations, and display the top 100 ranked nodes (with teleportation factor of 0.15).
Plot the pagerank values for the top 100 pages resulting from the 50 iterations run. Then plot the pagerank values for the same 100 pages that resulted from the 10 iterations run. Comment on your findings. Have the top 100 ranked pages changed? Have the pagerank values changed? Explain.

Report the AWS cluster configuration that you used and how long in minutes and seconds it takes to complete your job.

NOTE: Wikipedia data is located on S3 at
s3://ucb-mids-mls-networks/wikipedia/
-- s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-out.txt # Graph
-- s3://ucb-mids-mls-networks/wikipedia/indices.txt # Page titles and page Ids


In [ ]:
#on EMR
#time ~/spark/bin/spark-submit --master yarn-cluster ./pagerank.py s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-out.txt s3://ucb-mids-mls-katieadams/sparktest/pr-wiki_results

# Didn't finish running before submitting (cluster of 10 m3.xlarge instances)

HW 13.3: Spark GraphX versus your implementation of PageRank

Run the Spark GraphX PageRank implementation on the Wikipedia dataset for 10 iterations, and display the top 100 ranked nodes (with alpha = 0.85).

Run your PageRank implementation on the Wikipedia dataset for 50 iterations, and display the top 100 ranked nodes (with teleportation factor of 0.15).
Have the top 100 ranked pages changed? Comment on your findings. Plot both 100 curves.

Report the AWS cluster configuration that you used and how long in minutes and seconds it takes to complete this job.

Put the runtime results of HW13.2 and HW13.3 in a tabular format (with rows corresponding to implemention and columns corresponding to experiment setup (10 iterations, 50 iterations)). Discuss the run times and explaing the differences.

Plot the pagerank values for the top 100 pages resulting from the 50 iterations run (using GraphX). Then plot the pagerank values for the same 100 pages that resulted from the 50 iterations run of your homegrown pagerank implemnentation. Comment on your findings. Have the top 100 ranked pages changed? Have the pagerank values changed? Explain.


In [ ]:


HW 13.4: Criteo Phase 2 baseline

The Criteo data is located in the following S3 bucket: criteo-dataset
https://console.aws.amazon.com/s3/home?region=us-west-1#&bucket=criteo-dataset&prefix=

Using the training dataset, validation dataset and testing dataset in the Criteo bucket perform the following experiment:

-- write spark code (borrow from Phase 1 of this project) to train a logistic regression model with the following hyperparamters:

-- Number of buckets for hashing: 1,000
-- Logistic Regression: no regularization term
-- Logistic Regression: step size = 10

Report the AWS cluster configuration that you used and how long in minutes and seconds it takes to complete this job.

Report in tabular form the AUC value (https://en.wikipedia.org/wiki/Receiver_operating_characteristic) for the Training, Validation, and Testing datasets.
Report in tabular form the logLossTest for the Training, Validation, and Testing datasets.

Dont forget to put a caption on your tables (above each table).


In [133]:
%%writefile ctr_logreg.py
import sys
from pyspark import SparkContext

from collections import defaultdict
import hashlib
from math import log, exp

from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.evaluation import BinaryClassificationMetrics



def hashFunction(numBuckets, rawFeats, printMapping=False):
    """Calculate a feature dictionary for an observation's features based on hashing.

    Note:
        Use printMapping=True for debug purposes and to better understand how the hashing works.

    Args:
        numBuckets (int): Number of buckets to use as features.
        rawFeats (list of (int, str)): A list of features for an observation.  Represented as
            (featureID, value) tuples.
        printMapping (bool, optional): If true, the mappings of featureString to index will be
            printed.

    Returns:
        dict of int to float:  The keys will be integers which represent the buckets that the
            features have been hashed to.  The value for a given key will contain the count of the
            (featureID, value) tuples that have hashed to that key.
    """
    mapping = {}
    for ind, category in rawFeats:
        featureString = category + str(ind)
        mapping[featureString] = int(int(hashlib.md5(featureString).hexdigest(), 16) % numBuckets)
    if(printMapping): print mapping
    sparseFeatures = defaultdict(float)
    for bucket in mapping.values():
        sparseFeatures[bucket] += 1.0
    return dict(sparseFeatures)


def parseHashPoint(point, numBuckets):
    """Create a LabeledPoint for this observation using hashing.

    Args:
        point (str): A \t separated string where the first value is the label and the rest are
            features.
        numBuckets: The number of buckets to hash to.

    Returns:
        LabeledPoint: A LabeledPoint with a label (0.0 or 1.0) and a SparseVector of hashed
            features.
    """
    feats = point.split('\t')
    featlist = []
    for i,feat in enumerate(feats):
        if i==0:
            label=float(feat)
        else:
            featlist.append((i-1,feat))

    hashSparseVector = SparseVector(numBuckets, hashFunction(numBuckets, featlist, printMapping=False))  
    return LabeledPoint(label, hashSparseVector)


def getP(x, w, intercept):
    """Calculate the probability for an observation given a set of weights and intercept.

    Note:
        We'll bound our raw prediction between 20 and -20 for numerical purposes.

    Args:
        x (SparseVector): A vector with values of 1.0 for features that exist in this
            observation and 0.0 otherwise.
        w (DenseVector): A vector of weights (betas) for the model.
        intercept (float): The model's intercept.

    Returns:
        float: A probability between 0 and 1.
    """
    rawPrediction = x.dot(w)+intercept

    # Bound the raw prediction value
    rawPrediction = min(rawPrediction, 20)
    rawPrediction = max(rawPrediction, -20)
    
    return 1.0/(1.0+exp(-1.0*rawPrediction))


def computeLogLoss(p, y):
    """Calculates the value of log loss for a given probabilty and label.

    Note:
        log(0) is undefined, so when p is 0 we need to add a small value (epsilon) to it
        and when p is 1 we need to subtract a small value (epsilon) from it.

    Args:
        p (float): A probabilty between 0 and 1.
        y (int): A label.  Takes on the values 0 and 1.

    Returns:
        float: The log loss value.
    """
    epsilon = 10e-12
    if p==0:
        p = p+epsilon
    elif p==1:
        p = p-epsilon
    
    if y==1:
        return -1.0*log(p)
    elif y==0:
        return -1.0*log(1-p)
    else:
        return None


def evaluateResults(model, data):
    """Calculates the log loss for the data given the model.

    Args:
        model (LogisticRegressionModel): A trained logistic regression model.
        data (RDD of LabeledPoint): Labels and features for each observation.

    Returns:
        float: Log loss for the data.
    """
    
    return data.map(lambda point: computeLogLoss(getP(point.features, model.weights, model.intercept), point.label)).mean()


if __name__=="__main__":

    
    sc = SparkContext(appName="CTR2")
    rawData = sc.textFile(sys.argv[1])

    weights = [.8, .1, .1]
    seed = 42
    # Use randomSplit with weights and seed
    rawTrainData, rawValidationData, rawTestData = rawData.randomSplit([.8, .1, .1], seed=42)

    # Cache the data
    rawTrainData.cache()
    rawValidationData.cache()
    rawTestData.cache()
    
    numBucketsCTR = 1000
    hashTrainData = rawTrainData.map(lambda point: parseHashPoint(point, numBucketsCTR))
    hashTrainData.cache()
    hashValidationData = rawValidationData.map(lambda point: parseHashPoint(point, numBucketsCTR))
    hashValidationData.cache()
    hashTestData = rawTestData.map(lambda point: parseHashPoint(point, numBucketsCTR))
    hashTestData.cache()
    
    numIters = 15
    regType = None
    includeIntercept = True
    stepSize = 10.0
    model = LogisticRegressionWithSGD.train(hashTrainData, numIters, stepSize, regType=regType, intercept=includeIntercept)

    logLossTrain = evaluateResults(model, hashTrainData)
    logLossVal = evaluateResults(model, hashValidationData)
    logLossTest = evaluateResults(model, hashTestData)
    
    labelsAndScoresTrain = hashTrainData.map(lambda lp:
                                            (lp.label, getP(lp.features, model.weights, model.intercept)))
    metricsTrain = BinaryClassificationMetrics(labelsAndScoresTrain)

    labelsAndScoresVal = hashValidationData.map(lambda lp:
                                            (lp.label, getP(lp.features, model.weights, model.intercept)))
    metricsVal = BinaryClassificationMetrics(labelsAndScoresVal)

    labelsAndScoresTest = hashTestData.map(lambda lp:
                                            (lp.label, getP(lp.features, model.weights, model.intercept)))
    metricsTest = BinaryClassificationMetrics(labelsAndScoresTest)

    results_list = [('train',logLossTrain, metricsTrain.areaUnderROC), ('val',logLossVal, metricsVal.areaUnderROC), ('test',logLossTest, metricsTest.areaUnderROC)]
    results_rdd = sc.parallelize(results_list)
    results_rdd.saveAsTextFile(sys.argv[2])
    
    sc.stop()


Overwriting ctr_logreg.py

In [110]:
# test locally
#time ./bin/spark-submit  /Users/davidadams/Documents/W261/hw13/ctr_logreg.py /Users/davidadams/Documents/W261/hw13/dac_sample.txt /Users/davidadams/Documents/W261/hw13/CTR-test-result.txt

# run on EMR
# Cluster: emr-4.2.0, 15 m3.xlarge instances, maximizeResourceAllocation=True
#time ~/spark/bin/spark-submit --master yarn-cluster ./ctr_logreg.py s3://criteo-dataset/train.txt s3://ucb-mids-mls-katieadams/ctr_results_new
# real	12m54.512s
# user	0m8.600s
# sys	0m1.128s

In [144]:
from ast import literal_eval

def makeTable(filename):
    resultsfile = open(filename, 'r')
    
    print "Criteo CTR Logistic Regression Results Table"
    print "   (Parameters: 15 iterations, step size = 10, no regularization)"
    print "\nData\t\tLogLoss\tAUC"
    print "-------------------------------"
    for line in resultsfile.readlines():
        line_tup = literal_eval(line)
        data = line_tup[0]
        logloss = line_tup[1]
        auc = line_tup[2]
        if data=='train':
            print("Train\t\t%.4f\t%.4f" % (logloss,auc))
        elif data=='test':
            print("Test\t\t%.4f\t%.4f" % (logloss,auc))
        else:
            print("Validation\t%.4f\t%.4f" % (logloss,auc))
            
        
filename = 'CTR_results.txt'
makeTable(filename)


Criteo CTR Logistic Regression Results Table
   (Parameters: 15 iterations, step size = 10, no regularization)

Data		LogLoss	AUC
-------------------------------
Train		1.0087	0.6030
Validation	1.0085	0.6030
Test		1.0078	0.6034

HW 13.5: Criteo Phase 2 hyperparameter tuning

Using the training dataset, validation dataset and testing dataset in the Criteo bucket perform the following experiments:

-- write spark code (borrow from Phase 1 of this project) to train a logistic regression model with various hyperparamters. Do a gridsearch of the hyperparameter space and determine optimal settings using the validation set.

-- Number of buckets for hashing: 1,000, 10,000, .... explore different values here
-- Logistic Regression: regularization term: [1e-6, 1e-3] explore other values here also
-- Logistic Regression: step size: explore different step sizes. Focus on a stepsize of 1 initially.

Report the AWS cluster configuration that you used and how long in minutes and seconds it takes to complete this job.

Report in tabular form and using heatmaps the AUC values (https://en.wikipedia.org/wiki/Receiver_operating_characteristic) for the Training, Validation, and Testing datasets.
Report in tabular form and using heatmaps the logLossTest for the Training, Validation, and Testing datasets.

Dont forget to put a caption on your tables (above the table) and on your heatmap figures (put caption below figures) detailing the experiment associated with each table or figure (data, algorithm used, parameters and settings explored.

Discuss the optimal setting to solve this problem in terms of the following:
-- Features
-- Learning algortihm
-- Spark cluster

Justiy your recommendations based on your experimental results and cross reference with table numbers and figure numbers. Also highlight key results with annotations, both textual and line and box based, on your tables and graphs.


In [ ]: