The first set of methods cover the principals from the following summary: http://sci2s.ugr.es/ssl A batch-generative method, consisting of Kmeans and Logistic Regression, is implemented to cover a naive approach. This experiment is compared to a baseline whice consists of only Logistic Regression.
In [2]:
%run -i initilization.py
In [3]:
from pyspark.sql import functions as F
from pyspark.ml import clustering
from pyspark.ml import feature
from pyspark.sql import DataFrame
from pyspark.sql import Window
from pyspark.ml import Pipeline
from pyspark.ml import classification
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from shared import Plot2DGraphs, create_dummy_data
from semisupervised import batch_generative_model
In [4]:
mean_1 = [3.0, 3.0]
std_1 = [2, 2]
mean_2 = [-3.0, -3.0]
std_2 = [1. , 1.0]
n_1 = 300
n_2 = 300
n = [n_1, n_2]
mean = [mean_1, mean_2]
std = [std_1, std_2]
In [5]:
def compute_error_rate(data_frame, truth_label='real_label', found_label='prediction'):
"""
"""
df_stats = (data_frame
.groupBy([truth_label, found_label])
.agg(F.count('prediction').alias('Prediction Count'))
)
n = (df_stats
.select(F.sum(F.col('Prediction Count')).alias('n'))
.collect()[0]['n']
)
wrong_guess = (df_stats
.filter((F.col(truth_label) != F.col(found_label)))
.select(F.sum(F.col('Prediction Count')).alias('errors'))
.collect()[0]['errors']
)
df_stats.show()
print(n)
print(wrong_guess)
print('Error-rate: {}'.format(wrong_guess/n))
In [6]:
tester = create_dummy_data.create_labeled_data_with_clusters(n, mean, std, 0.01)
df_tester = spark.createDataFrame(tester)
The dataset with lables and available lables plotted
In [7]:
Plot2DGraphs.plot_known_and_unknown_data(tester)
In [8]:
df_train = df_tester.filter((F.col('used_label') != np.NaN))
df_test = df_tester.filter((F.col('used_label') == np.NaN))
vec_assembler = feature.VectorAssembler(
inputCols=['x','y'],
outputCol='features')
lg = classification.LogisticRegression(
featuresCol=vec_assembler.getOutputCol(),
labelCol='used_label')
pipeline = Pipeline(stages=[vec_assembler, lg])
# CrossValidation gets build here!
param_grid = (ParamGridBuilder()
.addGrid(lg.regParam, [0.1, 0.01])
.build()
)
evaluator = BinaryClassificationEvaluator(
rawPredictionCol=lg.getRawPredictionCol(),
labelCol=lg.getLabelCol())
cross_validator = CrossValidator(
estimator=pipeline,
estimatorParamMaps=param_grid,
evaluator=evaluator,
numFolds=3)
cross_validator_model = cross_validator.fit(df_train)
df_without_semisupervised = cross_validator_model.transform(df_test)
Plot2DGraphs.plot_known_and_unknown_data(
df_without_semisupervised.toPandas(),
labelCol='prediction')
In [9]:
compute_error_rate(df_without_semisupervised)
In [10]:
df_output = batch_generative_model.semi_supervised_batch_single_classifier_generate_approach(df_tester,['x','y'])
In [13]:
df_output.limit(5).toPandas()
Out[13]:
In [14]:
compute_error_rate(df_output)
Plot2DGraphs.plot_known_and_unknown_data(df_output.toPandas(), labelCol='prediction')
In [ ]:
In [1]:
df = spark.read.parquet('/home/svanhmic/workspace/data/DABAI/sparkdata/parquet/double_helix.parquet/')
df.write.csv('/home/svanhmic/workspace/data/DABAI/sparkdata/csv/double_helix.csv/')
In [ ]: