In [ ]:
# Here comes the Spark includes
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.ml import clustering
from pyspark.ml import Pipeline
from pyspark.ml import feature
from pyspark.ml import linalg
from pyspark.ml import tuning
# Here comes the scipy includes
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sb
# Python imports
import sys
import os
# Adding workspace to the pythonpath
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
sys.path.append(module_path)
In [ ]:
# Import the data...
file_loc = '/home/svanhmic/workspace/data/DABAI/sparkdata/parquet'
df = spark.read.parquet(
file_loc+'/normal_cluster_data.parquet')
In [ ]:
# Show the data
df.show(3)
In [ ]:
# Create the pipeline as may times before
vector_assembler = feature.VectorAssembler(
inputCols=['a','b'],
outputCol='features')
gaussian_mm = clustering.GaussianMixture()
gaussian_pipeline = Pipeline(stages=[vector_assembler,gaussian_mm])
In [ ]:
gaussian_parameter_map = (tuning.ParamGridBuilder()
.baseOn({gaussian_mm.featuresCol:vector_assembler.getOutputCol()})
.addGrid(gaussian_mm.maxIter,[50,100,200])
.addGrid(gaussian_mm.k,[2,3,4,5])
.build()
)
#gaussian_parameter_map
In [ ]:
models = gaussian_pipeline.fit(df,params=gaussian_parameter_map)
In [ ]:
def convert_covariance_to_pandas(model):
pdf = (model.
stages[-1].
gaussiansDF.
toPandas())
pdf['prediction'] = pd.Series(
pdf.index,
index=pdf.index)
pdf['centers'] = pdf['mean']
return pdf
In [ ]:
list_gaussians_df = list(map(lambda x: convert_covariance_to_pandas(x), models))
list_transformed_df = [i.transform(df).toPandas() for i in models]
In [ ]:
# local import
from shared import Plot2DGraphs
In [ ]:
for transformed, gaussians in zip(list_transformed_df,list_gaussians_df):
Plot2DGraphs.plot_gaussians(transformed,featuresCol=['a','b'],pandasMeanCov=gaussians)