In [19]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
from sklearn.preprocessing import LabelEncoder
from sklearn.externals import joblib # used to store and load models to disk
In [20]:
def get_imr_labels(line):
"""
Parses a line of the IMR dataset and only keeps the labels
:param line:
:return: a tuple of the 2 labels
"""
sl = line.split(";")
if not sl[1].isdigit():
return
label1 = int(sl[1])
label2 = int(sl[2])
return (label1, label2)
def parse_imr_line(line):
"""
Parses a line of the IMR csv dataset to tupples
:param line:
:return: ( (label1 (int), label2 (int)), features(list of float) )
"""
sl = line.split(";")
if not sl[1].isdigit():
return
label1 = int(sl[1])
label2 = int(sl[2])
features = map(float, sl[3:])
return ((label1, label2), features)
def parse_imr_line_encoding_labels(L1_encoder, L2_encoder, line):
"""
Parses a line of the IMR csv dataset to to encoded tupples (with consecutive class IDs)
:param L1_encoder: the LabelEncoder for the first label
:param L2_encoder: the LabelEncoder for the second label
:param line:
:return: ( (label1 (int), label2 (int)), features(list of float) )
"""
rv = parse_imr_line(line)
if rv is None : return
(label1, label2), features = rv[0], rv[1]
l1, l2 = L1_encoder.transform(label1), L2_encoder.transform(label2)
return ( (l1, l2) , features)
def create_labeled_point( labels_and_features, wanted_category):
"""
Parses the line using the parser function lambda, and creates a LabeledPoing with
the 'wanted' category as label
:param line: the line to parse
:param parser_function: the lambda function that creates the tuples
:param line: the string line to parse
"""
labels = labels_and_features[0]
features = labels_and_features[1]
return LabeledPoint(labels[wanted_category], features)
def get_labels_from_csv(raw_data_rrd):
"""
Given an imr csv file, returns the set of unique cat1 and cat2 labels in that file
:param fname: The path to the csv file
:return:
"""
label_tuples_rrd = raw_data_rrd.map(get_imr_labels).filter(lambda line: line is not None)
l1_rdd = label_tuples_rrd.map(lambda (l1,l2): l1)
l2_rdd = label_tuples_rrd.map(lambda (l1,l2): l2)
labels_1 = l1_rdd.distinct().collect()
labels_2 = l2_rdd.distinct().collect()
return labels_1, labels_2
def create_label_encoders(input_csv_file):
labels_1, labels_2 = get_labels_from_csv(input_csv_file)
L1_encoder = LabelEncoder();L1_encoder.fit(labels_1)
L2_encoder = LabelEncoder();L2_encoder.fit(labels_2)
return L1_encoder, L2_encoder
def store_label_encoders(enc1, enc2, le_path):
joblib.dump( (enc1, enc2), le_path)
def load_label_encoders(le_path):
(l1e, l2e) = joblib.load(le_path)
return l1e, l2e
def calculate_error(data_rrd, model):
# Evaluating the model on train data
labelsAndPreds = data_rrd.map(lambda p: (p.label, model.predict(p.features) ))
err = labelsAndPreds.filter(lambda (l, p): l != p).count() / float(data_rrd.count())
return err
In [21]:
in_file = "/home/cmantas/Data/Result_W2V_IMR_New.csv"
encoders_path= "/home/cmantas/Data/spark_lr_data/preprocess/label_encoders"
raw_data = sc.textFile(in_file)
l1e, l2e = create_label_encoders(raw_data)
store_label_encoders(l1e, l2e, encoders_path)
l1e, l2e = load_label_encoders(encoders_path)
classes_1_count = len(l1e.classes_); classes_2_count = len(l2e.classes_)
print "Class_counts: ", classes_1_count, classes_2_count
# lambda closure including the encoders
encoding_mapper = lambda line: parse_imr_line_encoding_labels(l1e, l2e, line)
encoded_tupple_data = raw_data.map(encoding_mapper).filter(lambda l: l is not None)
#lambda closure for creating a Labeled Points for the first category
labeled_point_1_mapper = lambda tupples: create_labeled_point(tupples, 0)
# the final RRD of Labaled points
labeled_data_1 = encoded_tupple_data.map(labeled_point_1_mapper)
In [4]:
def train(*args, **kwargs):
return LogisticRegressionWithLBFGS.train(*args, regType=None, intercept=True, **kwargs)
In [5]:
(training1Data, training2Data, testData) = labeled_data_1.randomSplit([0.4, 0.4, 0.2])
In [6]:
# Build the model with first half of training
model1 = train(training1Data, numClasses=classes_1_count)
print "Training(1) Error: ", calculate_error(training1Data, model1)
print "Training(2) Error: ", calculate_error(training2Data, model1)
# Evaluating the model on test data
print "Test Error: ", calculate_error(testData, model1)
# # Save and load model
# model.save(sc, "myModelPath")
# sameModel = LogisticRegressionModel.load(sc, "myModelPath")
In [7]:
# Build a new model based on the 1st one and training with more data
model2 = train(training2Data, initialWeights=model1.weights, numClasses=classes_1_count)
print "Training(1) Error: ", calculate_error(training1Data, model2)
print "Training(2) Error: ", calculate_error(training2Data, model2)
# Evaluating the model on test data
print "Test Error: ", calculate_error(testData, model2)
In [8]:
## buld yet another model with both training datasets
all_training=training1Data.union(training2Data)
model3 = train(all_training, numClasses=classes_1_count)
print "Training(1) Error: ", calculate_error(training1Data, model3)
print "Training(2) Error: ", calculate_error(training2Data, model3)
# Evaluating the model on test data
print "Test Error: ", calculate_error(testData, model3)
In [9]:
(training_new, test_new) = labeled_data_1.randomSplit([0.8, 0.2])
model4 = train(training_new, numClasses=classes_1_count)
print "Training(1) Error: ", calculate_error(training1Data, model4)
print "Training(2) Error: ", calculate_error(training2Data, model4)
# Evaluating the model on test data
print "Test Error: (new)", calculate_error(test_new, model4)
In [ ]:
In [ ]: