Outlier Detection/Cleaning workflow

The following notebook contains the process' that make up the Cleaning workflow. The notebook consists of: an imput process, selecting relevant columns, choosing the desired algorithm, and execution of the workflow. The algorithms that are available are: KMeans and Gaussian Mixture models.

Note: Spark has other clustering methods available, Bisecting KMeans and Latent Dirichlet Allocation which is used for topic modelling. These are not included in this workflow, due to unstability in these algorithms!

Initial imports and change the work directory to the base project folder.

In [1]:
sc.addPyFile('../dist_workflow/shared.zip')

In [2]:
%run -i initilization.py


/home/svanhmic/workspace/DABAI/Workflows
Data importation

This workflow uses our homemade class that handles importation of data.


In [3]:
from shared.Extension_to_timeit import pretty_time_result
from shared.GeneralDataImport import GeneralDataImport

dataIO = GeneralDataImport(
    parquet_path+"/normal_cluster_n_1000.parquet",
)


Select the relevant columns to be used in the imported dataset

Select respectively columns assigned to be lables and columns that are features


In [4]:
dataIO.select_features()



In [5]:
dataIO.select_id()



In [6]:
dataIO.select_labels()


An initial look at the data

  • Lets look at the raw numbers...
  • We also have a plot showing how the data is distributed in 2D

In [7]:
print(dataIO.list_features)
print(dataIO.list_label)
print(dataIO.list_id)
df = dataIO.data_frame
df.limit(5).toPandas()
#df.limit(5).toPandas()


['a', 'b']
['k', 'dimension']
['id']
Out[7]:
id k dimension a b
0 0 1.0 2.0 5.681480 3.158786
1 1 1.0 2.0 2.984444 5.840077
2 2 1.0 2.0 4.563474 4.873891
3 3 1.0 2.0 5.353377 4.905738
4 4 1.0 2.0 3.980736 5.922937

In [8]:
#ax = sb.regplot('a','b',df.toPandas(),fit_reg=False)
ax = sb.lmplot(
    'a',
    'b',
    dataIO._data_frame.toPandas(),
    fit_reg=False,
    size=8,
    hue='k',
    scatter_kws={'alpha':0.7,'s':60}
)
ax.ax.set_title('An initial look at data',fontsize=20)
plt.show()



In [9]:
#import data!
#from pyspark.sql import functions as F
#from shared.create_dummy_data import create_dummy_data 

#test_timer = %timeit -o feature_data = create_dummy_data(1000, "x y z", "label", outlier_number=0.2, outlier_factor=20)
#feature_data = feature_data.select([(10*F.col(i)).alias(i) for i in ["x","y","z"]])
#feature_data.orderBy('x',ascending=[0,0,0]).show()

Lets select an alogrithm for this test instance.

Note: We should also set some parameters; and this can be done via the fancy widgets.


In [10]:
# Select parameters
from cleaning.CreateParametersCleaning import ParamsCleaning

params = ParamsCleaning()
parameters = params.select_parameters()

In [11]:
parameters


Just to verify that we have the correct algorithm and parameters


In [15]:
from pyspark.ml import clustering

clustering.__all__


Out[15]:
['BisectingKMeans',
 'BisectingKMeansModel',
 'KMeans',
 'KMeansModel',
 'GaussianMixture',
 'GaussianMixtureModel',
 'LDA',
 'LDAModel',
 'LocalLDAModel',
 'DistributedLDAModel']

In [14]:
#print(params.output_parameters(parameters))
#test_params_1 = {'tol': 0.00001, 'k': 3, 'maxIter': 300, 'algorithm': 'GaussianMixture', 'seed': 1080866016001745000}
test_params_1 = params.output_parameters(parameters)
print(test_params_1)


{'tol': 0.001, 'k': 2, 'maxIter': 100, 'algorithm': 'GaussianMixture'}

OK! So fare so good.

Lets create the pipeline. We have all the ingredients needed to create it. Furthermore, we have some hacks! In order to performance test the algorithms properly, we have to import several datasets. This is ussually done below, but for this showcase we only use df


In [16]:
from cleaning.ExecuteCleaningWorkflow import ExecuteWorkflow

partitions = [80]
sizes = [1000]
features = dataIO.list_features
labels = dataIO.list_label
#print(features)
#print(labels)

execution_model = ExecuteWorkflow(
    dict_params = test_params_1,
    cols_features = features, 
    cols_labels = labels)

# this is hardcoded at the moment, use the comment for testing purposes!
collection_of_data = [parquet_path+'/normal_cluster_n_{}.parquet'.format(i) for i in sizes]
collection_of_model = []
collection_of_transformed = []
#collection_of_data
#counts = [i.rdd.getNumPartitions() for i in collection_of_data]
#counts
#collection_of_data.append(df)

The actual execution phase happens here.

There is alot of code here! It handles multiple datasets and partition sizes. A pipeline is also executed r times for each partition size and each dataset. Execution time results, which are important for performance testing is added to a log. The last trained machine learning model in each test instance is saved to an array: collection_of_model


In [18]:
for jdx, partition_size in enumerate(partitions):
    for idx, data in enumerate(collection_of_data):

        df_data = (
            spark.
            read.
            parquet(data).
            repartition(partition_size)
        )
        
        iteration = idx+jdx*len(collection_of_data)
        logger_tester.info(
            'Iteration {} for data size {}'.
            format(iteration, sizes[idx])
        )

        model_timer = %timeit -r1 -o collection_of_model.append(execution_model.execute_pipeline(df_data)) 
        transformer_timer = %timeit -o execution_model.apply_model(sc, collection_of_model[iteration], df_data)
        collection_of_model = collection_of_model[:iteration+1]
        logger_tester.info(
            'Iteration '+str(iteration)+' for training model : '+pretty_time_result(model_timer))
        logger_tester.info(
            'Iteration '+str(iteration)+' for transforming model : '+pretty_time_result(transformer_timer))
        #merged_df.write.parquet('/home/svanhmic/workspace/data/DABAI/sparkdata/parquet/merged_df_parquet')


4.14 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
183 ms ± 36 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Lets import the data again and apply the model

In order to reproduce our results, we use our model on our dataset. Note: This is acutally cheating, we should have a test set which is a subset of the original dataset, in order to test the robustness of the model.


In [20]:
df_data = (
    spark.
    read.
    parquet(data).
    repartition(partition_size)
)

df_results = execution_model.apply_model(sc,
    collection_of_model[iteration],
    df_data
)
df_results.limit(5).toPandas()


Out[20]:
prediction id k dimension a b features casted_features scaled_features probability mean cov centers
0 0 0 7 2 5.543376 -9.073114 [5.5433761888, -9.07311431512] [5.5433761888, -9.07311431512] [5.5433761888, -9.07311431512] [1.0, 4.76160583605e-13] [5.76836669331, 1.48512161755] DenseMatrix([[ 3.26283039, -2.68503552],\n ... [5.76836669331, 1.48512161755]
1 0 58 0 2 2.974459 0.373379 [2.97445873775, 0.373378969246] [2.97445873775, 0.373378969246] [2.97445873775, 0.373378969246] [0.999977124896, 2.28751044469e-05] [5.76836669331, 1.48512161755] DenseMatrix([[ 3.26283039, -2.68503552],\n ... [5.76836669331, 1.48512161755]
2 0 126 0 2 5.226935 1.476878 [5.22693466329, 1.47687790614] [5.22693466329, 1.47687790614] [5.22693466329, 1.47687790614] [0.999999987284, 1.27161212483e-08] [5.76836669331, 1.48512161755] DenseMatrix([[ 3.26283039, -2.68503552],\n ... [5.76836669331, 1.48512161755]
3 0 130 8 2 7.340900 -2.580350 [7.34090008255, -2.58035014908] [7.34090008255, -2.58035014908] [7.34090008255, -2.58035014908] [1.0, 3.43399021042e-14] [5.76836669331, 1.48512161755] DenseMatrix([[ 3.26283039, -2.68503552],\n ... [5.76836669331, 1.48512161755]
4 0 6 8 2 6.729923 -5.518585 [6.72992314934, -5.51858492447] [6.72992314934, -5.51858492447] [6.72992314934, -5.51858492447] [1.0, 5.95098011583e-14] [5.76836669331, 1.48512161755] DenseMatrix([[ 3.26283039, -2.68503552],\n ... [5.76836669331, 1.48512161755]

Let take a look at the results. Are there any outliers???


In [21]:
execution_model.parameters


Out[21]:
{'algorithm': 'GaussianMixture',
 'featuresCol': 'scaled_features',
 'k': 2,
 'maxIter': 100,
 'predictionCol': 'prediction',
 'probabilityCol': 'probability',
 'seed': -7090211980209472397,
 'tol': 0.001}

In [22]:
from cleaning.ShowCleaning import ShowResults
results = ShowResults(sc,
    execution_model.parameters,
    list_features=execution_model.features,
    list_labels=execution_model.labels)

prepared_df = results.prepare_table_data(df_results)

In [23]:
new_df = results.prepare_table_data(prepared_df, prediction_col='prediction')
summary_df = results.compute_summary(new_df)
summary_df.toPandas()


Out[23]:
Prediction count outlier_count outlier percentage
0 2 382 20 5.0
1 1 618 13 2.0

In [24]:
df_with_dists = results.select_cluster(new_df)


There appear to be no outliers in the data, when using the $\chi²$-distribution.


In [25]:
new_df.limit(5).toPandas()


Out[25]:
id k dimension a b features casted_features scaled_features prediction centers rowId distance computed_boundary is_outlier
0 131 8 2 6.665449 -4.020492 [6.66544900916, -4.02049227422] [6.66544900916, -4.02049227422] [6.66544900916, -4.02049227422] 6 [7.04262949413, -5.43828904252] 566935683072 1.467110 3.089498 False
1 132 8 2 6.965311 -6.706283 [6.96531074815, -6.70628275891] [6.96531074815, -6.70628275891] [6.96531074815, -6.70628275891] 6 [7.04262949413, -5.43828904252] 566935683073 1.270349 3.089498 False
2 9 8 2 6.926694 -5.055103 [6.92669416872, -5.05510305871] [6.92669416872, -5.05510305871] [6.92669416872, -5.05510305871] 6 [7.04262949413, -5.43828904252] 566935683074 0.400340 3.089498 False
3 133 8 2 7.994157 -5.382574 [7.99415658723, -5.38257406943] [7.99415658723, -5.38257406943] [7.99415658723, -5.38257406943] 6 [7.04262949413, -5.43828904252] 566935683075 0.953157 3.089498 False
4 135 8 2 7.896947 -5.801625 [7.89694682972, -5.80162539674] [7.89694682972, -5.80162539674] [7.89694682972, -5.80162539674] 6 [7.04262949413, -5.43828904252] 566935683076 0.928370 3.089498 False

In [27]:
from shared import Plot2DGraphs
from pyspark.sql import functions as F

if test_params_1['algorithm'] == 'GaussianMixture':
    Plot2DGraphs.plot_gaussians(
        new_df,
        execution_model.features,
        gaussian_std=2.5)
else:
    pdf_with_dists = new_df.toPandas()
    pdf_original = (dataIO.data_frame
                    .withColumn('k', F.col('k').cast('integer'))
                    .toPandas())
    
    pdf_with_dists.loc[:,'origin'] = 'kmeans'
    pdf_original.loc[:,'origin'] = 'original'
    pdf_original['prediction'] = pdf_original['k']
    pdf_merged = pd.concat([pdf_original, pdf_with_dists])
    g = sb.FacetGrid(pdf_merged,col="origin", hue="prediction",size=8)
    g.map(plt.scatter, "a", "b", alpha=.7)
    g.add_legend();
    g.set_titles(template='{col_name}')
    plt.show()


It appears that K-means creates clusters that resembel the input data eg cluster 6 and cluster 7.

Gaussian Mixture Model example

The following example shows the results for the Gaussian Mixture model in Apache Spark on our synthetic generated dataset, containing 10 multivariate normal distributions in 2D.


In [ ]:
df_with_dists = results.select_prototypes(df_results)

Again it appears that we have no outliers for the $\chi²$-distribution; Lets take a look at the plot of our Gaussian Mixture Model with $\mu$ and $\sigma$.


In [ ]:
if test_params_1['algorithm'] == 'GaussianMixture':
    Plot2DGraphs.plot_gaussians(
        df_results,
        execution_model.features,
        gaussian_std=2)
else:
    pdf_with_dists = df_with_dists.toPandas()
    pdf_original = (dataIO.data_frame
                    .withColumn('k', F.col('k').cast('integer'))
                    .toPandas())
    
    pdf_with_dists.loc[:,'origin'] = 'kmeans'
    pdf_original.loc[:,'origin'] = 'original'
    pdf_original['prediction'] = pdf_original['k']
    pdf_merged = pd.concat([pdf_original, pdf_with_dists])
    g = sb.FacetGrid(pdf_merged,col="origin", hue="prediction",size=8)
    g.map(plt.scatter, "a", "b", alpha=.7)
    g.add_legend();
    g.set_titles(template='{col_name}')
    plt.show()

$\mu$ is indicated with x and each eplisis indicates the covariance matrix. It appears that there are some outliers contained in each cluster for $2\sigma$.