In [19]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint

from sklearn.preprocessing import LabelEncoder
from sklearn.externals import joblib # used to store and load models to disk

In [20]:
def get_imr_labels(line):
    """
    Parses a line of the IMR dataset and only keeps the labels
    :param line:
    :return: a tuple of the 2 labels
    """
    sl = line.split(";")
    if not sl[1].isdigit():
        return
    label1 = int(sl[1])
    label2 = int(sl[2])
    return (label1, label2)


def parse_imr_line(line):
    """
    Parses a line of the IMR csv dataset to tupples
    :param line:
    :return: ( (label1 (int), label2 (int)), features(list of float) )
    """
    sl = line.split(";")
    if not sl[1].isdigit():
        return

    label1 = int(sl[1])
    label2 = int(sl[2])
    features = map(float, sl[3:])
    return ((label1, label2), features)

def parse_imr_line_encoding_labels(L1_encoder, L2_encoder, line):
    """
    Parses a line of the IMR csv dataset to to encoded tupples (with consecutive class IDs)
    :param L1_encoder: the LabelEncoder for the first label
    :param L2_encoder:  the LabelEncoder for the second label
    :param line:
    :return: ( (label1 (int), label2 (int)), features(list of float) )
    """
    rv = parse_imr_line(line)
    if rv is None : return

    (label1, label2), features = rv[0], rv[1]
    l1, l2 = L1_encoder.transform(label1), L2_encoder.transform(label2)
    return ( (l1, l2) , features)

def create_labeled_point( labels_and_features, wanted_category):
    """
    Parses the line using the parser function lambda, and creates a LabeledPoing with
    the 'wanted' category as label
    :param line: the line to parse
    :param parser_function: the lambda function that creates the tuples
    :param line: the string line to parse    
    """
    labels = labels_and_features[0]
    features = labels_and_features[1]
    
    return LabeledPoint(labels[wanted_category], features)      

def get_labels_from_csv(raw_data_rrd):
    """
    Given an imr csv file, returns the set of unique cat1 and cat2 labels in that file
    :param fname: The path to the csv file
    :return:
    """
    label_tuples_rrd = raw_data_rrd.map(get_imr_labels).filter(lambda line: line is not None)
    l1_rdd = label_tuples_rrd.map(lambda (l1,l2): l1)
    l2_rdd = label_tuples_rrd.map(lambda (l1,l2): l2)
    labels_1 = l1_rdd.distinct().collect()
    labels_2 = l2_rdd.distinct().collect()
    return labels_1, labels_2

def create_label_encoders(input_csv_file):
    labels_1, labels_2 =  get_labels_from_csv(input_csv_file)
    L1_encoder = LabelEncoder();L1_encoder.fit(labels_1)
    L2_encoder = LabelEncoder();L2_encoder.fit(labels_2)
    return L1_encoder, L2_encoder

def store_label_encoders(enc1, enc2, le_path):
    joblib.dump( (enc1, enc2), le_path)
    
def load_label_encoders(le_path):
    (l1e, l2e) = joblib.load(le_path)
    return l1e, l2e

def calculate_error(data_rrd, model):
    # Evaluating the model on train data
    labelsAndPreds = data_rrd.map(lambda p: (p.label, model.predict(p.features) ))
    err = labelsAndPreds.filter(lambda (l, p): l != p).count() / float(data_rrd.count())
    return err

In [21]:
in_file = "/home/cmantas/Data/Result_W2V_IMR_New.csv"
encoders_path= "/home/cmantas/Data/spark_lr_data/preprocess/label_encoders"

raw_data = sc.textFile(in_file)

l1e, l2e =  create_label_encoders(raw_data)


store_label_encoders(l1e, l2e, encoders_path)
l1e, l2e = load_label_encoders(encoders_path)

classes_1_count = len(l1e.classes_); classes_2_count = len(l2e.classes_)

print "Class_counts: ", classes_1_count, classes_2_count

# lambda closure including the encoders
encoding_mapper = lambda line: parse_imr_line_encoding_labels(l1e, l2e, line)

encoded_tupple_data = raw_data.map(encoding_mapper).filter(lambda l: l is not None)

#lambda closure for creating a Labeled Points for the first category
labeled_point_1_mapper = lambda tupples: create_labeled_point(tupples, 0)

# the final RRD of Labaled points
labeled_data_1 = encoded_tupple_data.map(labeled_point_1_mapper)


Class_counts:  5789 52

LogisticRegressionWithLBFGS


In [4]:
def train(*args, **kwargs):
    return LogisticRegressionWithLBFGS.train(*args, regType=None, intercept=True, **kwargs)

Split dataset into 2 training datasets and 1 testing


In [5]:
(training1Data, training2Data, testData) = labeled_data_1.randomSplit([0.4, 0.4, 0.2])

train with 1/2 of training data


In [6]:
# Build the model with first half of training
model1 = train(training1Data, numClasses=classes_1_count)
print "Training(1) Error: ", calculate_error(training1Data, model1)
print "Training(2) Error: ", calculate_error(training2Data, model1)

# Evaluating the model on test data
print "Test Error: ", calculate_error(testData, model1)

# # Save and load model
# model.save(sc, "myModelPath")
# sameModel = LogisticRegressionModel.load(sc, "myModelPath")


Training(1) Error:  0.0
Training(2) Error:  0.527777777778
Test Error:  0.461538461538

train with 2nd half of training data


In [7]:
# Build a new model based on the 1st one and training with more data
model2 = train(training2Data, initialWeights=model1.weights, numClasses=classes_1_count)
print "Training(1) Error: ", calculate_error(training1Data, model2)
print "Training(2) Error: ", calculate_error(training2Data, model2)

# Evaluating the model on test data
print "Test Error: ", calculate_error(testData, model2)


Training(1) Error:  0.135135135135
Training(2) Error:  0.0
Test Error:  0.461538461538

train with both halves of training data


In [8]:
## buld yet another model with both training datasets
all_training=training1Data.union(training2Data)

model3 = train(all_training, numClasses=classes_1_count)
print "Training(1) Error: ", calculate_error(training1Data, model3)
print "Training(2) Error: ", calculate_error(training2Data, model3)

# Evaluating the model on test data
print "Test Error: ", calculate_error(testData, model3)


Training(1) Error:  0.0
Training(2) Error:  0.0
Test Error:  0.423076923077

train with new splits


In [9]:
(training_new, test_new) = labeled_data_1.randomSplit([0.8, 0.2])
model4 = train(training_new, numClasses=classes_1_count)
print "Training(1) Error: ", calculate_error(training1Data, model4)
print "Training(2) Error: ", calculate_error(training2Data, model4)

# Evaluating the model on test data
print "Test Error: (new)", calculate_error(test_new, model4)


Training(1) Error:  0.027027027027
Training(2) Error:  0.0277777777778
Test Error: (new) 0.166666666667

In [ ]:


In [ ]: