In [ ]:
# Here comes the Spark includes
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.ml import clustering
from pyspark.ml import Pipeline
from pyspark.ml import feature
from pyspark.ml import linalg
from pyspark.ml import tuning

# Here comes the scipy includes
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sb

# Python imports
import sys
import os

# Adding workspace to the pythonpath
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

In [ ]:
# Import the data...
file_loc = '/home/svanhmic/workspace/data/DABAI/sparkdata/parquet'
df = spark.read.parquet(
    file_loc+'/normal_cluster_data.parquet')

In [ ]:
# Show the data
df.show(3)

In [ ]:
# Create the pipeline as may times before
vector_assembler = feature.VectorAssembler(
    inputCols=['a','b'],
    outputCol='features')

gaussian_mm = clustering.GaussianMixture()

gaussian_pipeline = Pipeline(stages=[vector_assembler,gaussian_mm])

In [ ]:
gaussian_parameter_map = (tuning.ParamGridBuilder()
                          .baseOn({gaussian_mm.featuresCol:vector_assembler.getOutputCol()})
                          .addGrid(gaussian_mm.maxIter,[50,100,200])
                          .addGrid(gaussian_mm.k,[2,3,4,5])
                          .build()
                         )
#gaussian_parameter_map

In [ ]:
models = gaussian_pipeline.fit(df,params=gaussian_parameter_map)

In [ ]:
def convert_covariance_to_pandas(model):
    
    pdf = (model.
           stages[-1].
           gaussiansDF.
           toPandas())
    
    pdf['prediction'] = pd.Series(
        pdf.index,
        index=pdf.index)
    
    pdf['centers'] = pdf['mean']
    
    return pdf

In [ ]:
list_gaussians_df = list(map(lambda x: convert_covariance_to_pandas(x), models))
list_transformed_df = [i.transform(df).toPandas() for i in models]

In [ ]:
# local import
from shared import Plot2DGraphs

In [ ]:
for transformed, gaussians in zip(list_transformed_df,list_gaussians_df):
    Plot2DGraphs.plot_gaussians(transformed,featuresCol=['a','b'],pandasMeanCov=gaussians)