In [ ]:
from IPython.display import display, HTML

In [ ]:
%%HTML
<script>
  function code_toggle() {
    if (code_shown){
      $('div.input').hide('500');
      $('#toggleButton').val('Show Code')
    } else {
      $('div.input').show('500');
      $('#toggleButton').val('Hide Code')
    }
    code_shown = !code_shown
  }

  $( document ).ready(function(){
    code_shown=false;
    $('div.input').hide()
  });
</script>
<form action="javascript:code_toggle()"><input type="submit" id="toggleButton" value="Show Code"></form>

In [ ]:
#import from regular python

import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as md
import numpy as np
import seaborn as sb
from ipywidgets import widgets


import os
import re
import getpass

user = getpass.getuser()

module_path = os.path.abspath(os.path.join('..'))

In [ ]:
#imports from Pyspark

from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans

sc.addPyFile(module_path+"/test/sample/ConvertAllToVecToMl.py")
from ConvertAllToVecToMl import ConvertAllToVecToMl

In [ ]:
data_Pdf = sb.load_dataset("flights") # naive data set from seaborn
#data_Pdf

In [ ]:
#create spark data frame
flightDf = sqlContext.createDataFrame(data_Pdf)
#flightDf.show()
# Convering month to long

import datetime as dt
func = F.udf(lambda x: dt.datetime.strptime(x, '%b').month)

flightInfoDf = flightDf.select('*').withColumn('month', func(F.substring(str='month',pos=0,len=3)))

flightInfoDf= (flightInfoDf
               .withColumn("timestamp",F.concat(F.col("year"),F.lit("-"),F.col("month"),F.lit("-1 12:00:0")))
               .withColumn("datetime",F.from_utc_timestamp("timestamp","UTC"))
               .withColumn("timestamp",F.unix_timestamp(F.col("timestamp"),"yyyy-MM-dd HH:mm:ss"))
               
               #.select('newdate', 'passengers')
              )

In [ ]:
#create the spark ml pipeline
myCol = ["timestamp", "passengers"]
vectorAssembler = VectorAssembler(inputCols=myCol,outputCol="features")
convertion = ConvertAllToVecToMl(inputCol=vectorAssembler.getOutputCol(), outputCol=vectorAssembler.getOutputCol())
clusters = KMeans(featuresCol=vectorAssembler.getOutputCol(), k=4, maxIter=10, initMode="random")

pipeline = Pipeline(stages=[vectorAssembler, convertion, clusters])

In [ ]:
#execute the pipeline

model = pipeline.fit(flightInfoDf)
prediction = model.transform(flightInfoDf)

In [ ]:
prediction.toPandas()

In [ ]:
#return the data to Pandas for visualisation
button = widgets.Button(description="Visualize data")
display(button)

def on_button_clicked(b):
    g = sb.FacetGrid(prediction.toPandas(), hue="prediction", size=5, palette="Set2")
    xfmt = md.DateFormatter('%Y')
    g.ax.yaxis.set_major_formatter(xfmt)
    g.map(plt.scatter, "passengers", "datetime", s=50, alpha=.7, linewidth=.5, edgecolor="white")
    plt.ylabel('year')
    plt.show()

button.on_click(on_button_clicked)

In [ ]:


In [ ]: