In [15]:
%matplotlib inline
import pyspark

import matplotlib.pyplot as plt
import numpy as np
import scipy

from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD
import pandas as pd

In [16]:
sc = pyspark.SparkContext("local", "task")


---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-16-70661cbf6d76> in <module>()
----> 1 sc = pyspark.SparkContext("local", "task")

/usr/local/spark/python/pyspark/context.pyc in __init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls)
    106         """
    107         self._callsite = first_spark_call() or CallSite(None, None, None)
--> 108         SparkContext._ensure_initialized(self, gateway=gateway)
    109         try:
    110             self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,

/usr/local/spark/python/pyspark/context.pyc in _ensure_initialized(cls, instance, gateway)
    237                         " created by %s at %s:%s "
    238                         % (currentAppName, currentMaster,
--> 239                             callsite.function, callsite.file, callsite.linenum))
    240                 else:
    241                     SparkContext._active_spark_context = instance

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=task, master=local) created by __init__ at <ipython-input-2-70661cbf6d76>:1 

In [17]:
from math import isnan
from operator import or_

def parsePoints(df):
    data = list()
    for i in xrange(len(df)):
        survived = float(df["Survived"][i])
        age_ = float(df["Age"][i])
        
        # Example of handling NaN values
        # age = 23.0 if isnan(age_) else age_
        
        pClass = float(df["Pclass"][i])
        nameL = float(len(df["Name"][i]))
        sex = 1.0 if df["Sex"][i] == "male" else 0.0
        sibSp = float(df["SibSp"][i])
        parch = float(df["Parch"][i])
        fare = float(df["Fare"][i])
        features = [age, pClass, nameL, sex, sibSp, parch, fare]
    
        if reduce(or_, map(isnan, features)):
            continue
        
        p = LabeledPoint(survived, features)
        data.append(p)
    return data

df = pd.read_csv("./train.csv")
data = parsePoints(df)
print "N:", len(data)
train, test = sc.parallelize(data).randomSplit([0.5, 0.5], seed=2l)

print "Train positive:", train.filter(lambda p: p.label == 1.0).count()
print "Train negative:", train.filter(lambda p: p.label == 0.0).count()

print "Test positive:", test.filter(lambda p: p.label == 1.0).count()
print "Test negative:", test.filter(lambda p: p.label == 0.0).count()


---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-17-fd0013bfc42a> in <module>()
     27 
     28 df = pd.read_csv("./train.csv")
---> 29 data = parsePoints(df)
     30 print "N:", len(data)
     31 train, test = sc.parallelize(data).randomSplit([0.5, 0.5], seed=2l)

<ipython-input-17-fd0013bfc42a> in parsePoints(df)
     17         parch = float(df["Parch"][i])
     18         fare = float(df["Fare"][i])
---> 19         features = [age, pClass, nameL, sex, sibSp, parch, fare]
     20 
     21         if reduce(or_, map(isnan, features)):

NameError: global name 'age' is not defined

In [18]:
### data: [(real, predicted)]
def roc_curve(data):
    from sklearn.metrics import roc_curve, auc

    # Compute ROC curve and area the curve
    fpr, tpr, thresholds = roc_curve([ r for r, p in data ], [ p for r, p in data ])
    roc_auc = auc(fpr, tpr)

    # Plot ROC curve

    fig = plt.figure(figsize=(7,5))
    ax = fig.add_subplot(111)

    plt.plot(fpr, tpr, color='lightblue', lw=2, label='ROC curve')
    plt.plot([0, 1], [0, 1], color='black', lw=2, linestyle='dotted', label='random guessing')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.0])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver operating characteristic (ROC)')
    plt.legend(loc="lower right")
    ax.annotate('AUC = %0.2f' %roc_auc, xy=(0.35, 0.6))
    plt.show()

In [19]:
xs = train.map(lambda p: p.features[0]).collect()
ys = train.map(lambda p: p.label).collect()

plt.figure()
plt.ylim([-0.1, 1.1])
plt.plot(xs, ys, "x")
plt.show()


Tasks

All tasks should be done using spark and mllib if it's possible.

  • Train logistic regression classifier on Titanic dataset
  • Plot projection of the dataset on w (normal to separation hyperplane) and logistic curve
  • Calculate different metrics: MSE, l1 loss, precision, recall, accuracy.
  • Plot error curve by number of iterations: for example: 5, 10, 25, 50, 100, 150
  • Suggest a way to deal with NaNs (see parsePoints function) in the age column and compare results.

In [ ]: