In [1]:
# imports
from __future__ import print_function # treat print as function

from IPython.display import display

from numpy import array
from math import sqrt

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

from pyspark.ml.clustering import KMeans, KMeansModel

# For plotting (testing)
%matplotlib inline
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
pd.__version__


Out[1]:
u'0.18.0'

In [2]:
ds = spark.read.option("inferSchema", "true").option("header", "true").option("nullValue", "?").csv("data/mtcars.csv")

ds.printSchema()
ds.show()


root
 |-- _c0: string (nullable = true)
 |-- mpg: double (nullable = true)
 |-- cyl: integer (nullable = true)
 |-- disp: double (nullable = true)
 |-- hp: integer (nullable = true)
 |-- drat: double (nullable = true)
 |-- wt: double (nullable = true)
 |-- qsec: double (nullable = true)
 |-- vs: integer (nullable = true)
 |-- am: integer (nullable = true)
 |-- gear: integer (nullable = true)
 |-- carb: integer (nullable = true)

+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|                _c0| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|          Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|      Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|         Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|     Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|  Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
|            Valiant|18.1|  6|225.0|105|2.76| 3.46|20.22|  1|  0|   3|   1|
|         Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|   3|   4|
|          Merc 240D|24.4|  4|146.7| 62|3.69| 3.19| 20.0|  1|  0|   4|   2|
|           Merc 230|22.8|  4|140.8| 95|3.92| 3.15| 22.9|  1|  0|   4|   2|
|           Merc 280|19.2|  6|167.6|123|3.92| 3.44| 18.3|  1|  0|   4|   4|
|          Merc 280C|17.8|  6|167.6|123|3.92| 3.44| 18.9|  1|  0|   4|   4|
|         Merc 450SE|16.4|  8|275.8|180|3.07| 4.07| 17.4|  0|  0|   3|   3|
|         Merc 450SL|17.3|  8|275.8|180|3.07| 3.73| 17.6|  0|  0|   3|   3|
|        Merc 450SLC|15.2|  8|275.8|180|3.07| 3.78| 18.0|  0|  0|   3|   3|
| Cadillac Fleetwood|10.4|  8|472.0|205|2.93| 5.25|17.98|  0|  0|   3|   4|
|Lincoln Continental|10.4|  8|460.0|215| 3.0|5.424|17.82|  0|  0|   3|   4|
|  Chrysler Imperial|14.7|  8|440.0|230|3.23|5.345|17.42|  0|  0|   3|   4|
|           Fiat 128|32.4|  4| 78.7| 66|4.08|  2.2|19.47|  1|  1|   4|   1|
|        Honda Civic|30.4|  4| 75.7| 52|4.93|1.615|18.52|  1|  1|   4|   2|
|     Toyota Corolla|33.9|  4| 71.1| 65|4.22|1.835| 19.9|  1|  1|   4|   1|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 20 rows


In [3]:
localds = ds.collect()

In [4]:
type(ds)


Out[4]:
pyspark.sql.dataframe.DataFrame

In [5]:
# vector assembler
assembler = VectorAssembler().setInputCols(["mpg", "cyl", "disp", "hp", "drat", "wt"]).setOutputCol("features")

assemdata = assembler.transform(ds)

scaled = StandardScaler().setInputCol("features").setOutputCol("scaledFeatures").setWithStd(True).setWithMean(True)

# Compute summary statistics by fitting the StandardScaler.
scalerModel = scaled.fit(assemdata)
    
# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(assemdata)
    
clusters = 5
# Trains a k-means model
kmeans = KMeans().setK(clusters).setMaxIter(1000).setFeaturesCol("scaledFeatures").setPredictionCol("prediction")
    
model = kmeans.fit(scaledData)

In [6]:
# Evaluate clustering by computing Within Set Sum of Squared Errors.
WSSSE = model.computeCost(scaledData)
print("Within Set Sum of Squared Errors = " + str(WSSSE))

# Shows the result.
print("Cluster Centers: ")
for center in model.clusterCenters():
    print(center)


Within Set Sum of Squared Errors = 30.282086364
Cluster Centers: 
[-0.68023084  1.01488215  0.75804294  0.88231895 -0.59716191  0.42555388]
[ 1.65523937 -1.22485777 -1.16244468 -1.03828065  1.22522951 -1.37384616]
[-0.05928235 -0.10498781 -0.17257356 -0.24338976 -0.82896352 -0.070434  ]
[ 0.20434841 -0.72713779 -0.7034812  -0.64529178  0.52802995 -0.30379451]
[-1.37006186  1.01488215  1.82841574  1.02065693 -1.01599172  2.16914561]

In [7]:
# predict
predict = model.transform(scaledData)
predict.toPandas()


Out[7]:
_c0 mpg cyl disp hp drat wt qsec vs am gear carb features scaledFeatures prediction
0 Mazda RX4 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4 [21.0, 6.0, 160.0, 110.0, 3.9, 2.62] [0.150884824648, -0.104987808575, -0.570619818... 3
1 Mazda RX4 Wag 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4 [21.0, 6.0, 160.0, 110.0, 3.9, 2.875] [0.150884824648, -0.104987808575, -0.570619818... 3
2 Datsun 710 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1 [22.8, 4.0, 108.0, 93.0, 3.85, 2.32] [0.449543446631, -1.22485776671, -0.9901820908... 3
3 Hornet 4 Drive 21.4 6 258.0 110 3.08 3.215 19.44 1 0 3 1 [21.4, 6.0, 258.0, 110.0, 3.08, 3.215] [0.217253407311, -0.104987808575, 0.2200936943... 2
4 Hornet Sportabout 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2 [18.7, 8.0, 360.0, 175.0, 3.15, 3.44] [-0.230734525664, 1.01488214956, 1.04308122834... 0
5 Valiant 18.1 6 225.0 105 2.76 3.460 20.22 1 0 3 1 [18.1, 6.0, 225.0, 105.0, 2.76, 3.46] [-0.330287399658, -0.104987808575, -0.04616697... 2
6 Duster 360 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4 [14.3, 8.0, 360.0, 245.0, 3.21, 3.57] [-0.960788934956, 1.01488214956, 1.04308122834... 0
7 Merc 240D 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2 [24.4, 4.0, 146.7, 62.0, 3.69, 3.19] [0.715017777282, -1.22485776671, -0.6779309382... 3
8 Merc 230 22.8 4 140.8 95 3.92 3.150 22.90 1 0 4 2 [22.8, 4.0, 140.8, 95.0, 3.92, 3.15] [0.449543446631, -1.22485776671, -0.7255351191... 3
9 Merc 280 19.2 6 167.6 123 3.92 3.440 18.30 1 0 4 4 [19.2, 6.0, 167.6, 123.0, 3.92, 3.44] [-0.147773797335, -0.104987808575, -0.50929917... 3
10 Merc 280C 17.8 6 167.6 123 3.92 3.440 18.90 1 0 4 4 [17.8, 6.0, 167.6, 123.0, 3.92, 3.44] [-0.380063836655, -0.104987808575, -0.50929917... 3
11 Merc 450SE 16.4 8 275.8 180 3.07 4.070 17.40 0 0 3 3 [16.4, 8.0, 275.8, 180.0, 3.07, 4.07] [-0.612353875976, 1.01488214956, 0.36371308754... 0
12 Merc 450SL 17.3 8 275.8 180 3.07 3.730 17.60 0 0 3 3 [17.3, 8.0, 275.8, 180.0, 3.07, 3.73] [-0.463024564984, 1.01488214956, 0.36371308754... 0
13 Merc 450SLC 15.2 8 275.8 180 3.07 3.780 18.00 0 0 3 3 [15.2, 8.0, 275.8, 180.0, 3.07, 3.78] [-0.811459623964, 1.01488214956, 0.36371308754... 0
14 Cadillac Fleetwood 10.4 8 472.0 205 2.93 5.250 17.98 0 0 3 4 [10.4, 8.0, 472.0, 205.0, 2.93, 5.25] [-1.60788261592, 1.01488214956, 1.94675381466,... 4
15 Lincoln Continental 10.4 8 460.0 215 3.00 5.424 17.82 0 0 3 4 [10.4, 8.0, 460.0, 215.0, 3.0, 5.424] [-1.60788261592, 1.01488214956, 1.84993175184,... 4
16 Chrysler Imperial 14.7 8 440.0 230 3.23 5.345 17.42 0 0 3 4 [14.7, 8.0, 440.0, 230.0, 3.23, 5.345] [-0.894420352293, 1.01488214956, 1.68856164714... 4
17 Fiat 128 32.4 4 78.7 66 4.08 2.200 19.47 1 1 4 1 [32.4, 4.0, 78.7, 66.0, 4.08, 2.2] [2.04238943054, -1.22485776671, -1.22658929427... 1
18 Honda Civic 30.4 4 75.7 52 4.93 1.615 18.52 1 1 4 2 [30.4, 4.0, 75.7, 52.0, 4.93, 1.615] [1.71054651723, -1.22485776671, -1.25079480998... 1
19 Toyota Corolla 33.9 4 71.1 65 4.22 1.835 19.90 1 1 4 1 [33.9, 4.0, 71.1, 65.0, 4.22, 1.835] [2.29127161553, -1.22485776671, -1.28790993406... 1
20 Toyota Corona 21.5 4 120.1 97 3.70 2.465 20.01 1 0 3 1 [21.5, 4.0, 120.1, 97.0, 3.7, 2.465] [0.233845552976, -1.22485776671, -0.8925531775... 3
21 Dodge Challenger 15.5 8 318.0 150 2.76 3.520 16.87 0 0 3 2 [15.5, 8.0, 318.0, 150.0, 2.76, 3.52] [-0.761683186967, 1.01488214956, 0.70420400846... 0
22 AMC Javelin 15.2 8 304.0 150 3.15 3.435 17.30 0 0 3 2 [15.2, 8.0, 304.0, 150.0, 3.15, 3.435] [-0.811459623964, 1.01488214956, 0.59124493517... 0
23 Camaro Z28 13.3 8 350.0 245 3.73 3.840 15.41 0 0 3 4 [13.3, 8.0, 350.0, 245.0, 3.73, 3.84] [-1.12671039161, 1.01488214956, 0.962396175986... 0
24 Pontiac Firebird 19.2 8 400.0 175 3.08 3.845 17.05 0 0 3 2 [19.2, 8.0, 400.0, 175.0, 3.08, 3.845] [-0.147773797335, 1.01488214956, 1.36582143774... 0
25 Fiat X1-9 27.3 4 79.0 66 4.08 1.935 18.90 1 1 4 1 [27.3, 4.0, 79.0, 66.0, 4.08, 1.935] [1.19619000159, -1.22485776671, -1.2241687427,... 1
26 Porsche 914-2 26.0 4 120.3 91 4.43 2.140 16.70 0 1 5 2 [26.0, 4.0, 120.3, 91.0, 4.43, 2.14] [0.980492107934, -1.22485776671, -0.8909394764... 1
27 Lotus Europa 30.4 4 95.1 113 3.77 1.513 16.90 1 1 5 2 [30.4, 4.0, 95.1, 113.0, 3.77, 1.513] [1.71054651723, -1.22485776671, -1.09426580842... 1
28 Ford Pantera L 15.8 8 351.0 264 4.22 3.170 14.50 0 1 5 4 [15.8, 8.0, 351.0, 264.0, 4.22, 3.17] [-0.71190674997, 1.01488214956, 0.970464681221... 0
29 Ferrari Dino 19.7 6 145.0 175 3.62 2.770 15.50 0 1 5 6 [19.7, 6.0, 145.0, 175.0, 3.62, 2.77] [-0.0648130690067, -0.104987808575, -0.6916473... 2
30 Maserati Bora 15.0 8 301.0 335 3.54 3.570 14.60 0 1 5 8 [15.0, 8.0, 301.0, 335.0, 3.54, 3.57] [-0.844643915296, 1.01488214956, 0.56703941947... 0
31 Volvo 142E 21.4 4 121.0 109 4.11 2.780 18.60 1 1 4 2 [21.4, 4.0, 121.0, 109.0, 4.11, 2.78] [0.217253407311, -1.22485776671, -0.8852915228... 3

In [8]:
for i in range(clusters):
    predictionsPerCol = predict.filter(predict["prediction"] == i)
    print("Cluster " + str(i))
    gorts = predictionsPerCol.select(predict["_c0"], predict["features"], predict["prediction"]).collect
    for gort in gorts():
        print (gort)
    print("======================================================")


Cluster 0
Row(_c0=u'Hornet Sportabout', features=DenseVector([18.7, 8.0, 360.0, 175.0, 3.15, 3.44]), prediction=0)
Row(_c0=u'Duster 360', features=DenseVector([14.3, 8.0, 360.0, 245.0, 3.21, 3.57]), prediction=0)
Row(_c0=u'Merc 450SE', features=DenseVector([16.4, 8.0, 275.8, 180.0, 3.07, 4.07]), prediction=0)
Row(_c0=u'Merc 450SL', features=DenseVector([17.3, 8.0, 275.8, 180.0, 3.07, 3.73]), prediction=0)
Row(_c0=u'Merc 450SLC', features=DenseVector([15.2, 8.0, 275.8, 180.0, 3.07, 3.78]), prediction=0)
Row(_c0=u'Dodge Challenger', features=DenseVector([15.5, 8.0, 318.0, 150.0, 2.76, 3.52]), prediction=0)
Row(_c0=u'AMC Javelin', features=DenseVector([15.2, 8.0, 304.0, 150.0, 3.15, 3.435]), prediction=0)
Row(_c0=u'Camaro Z28', features=DenseVector([13.3, 8.0, 350.0, 245.0, 3.73, 3.84]), prediction=0)
Row(_c0=u'Pontiac Firebird', features=DenseVector([19.2, 8.0, 400.0, 175.0, 3.08, 3.845]), prediction=0)
Row(_c0=u'Ford Pantera L', features=DenseVector([15.8, 8.0, 351.0, 264.0, 4.22, 3.17]), prediction=0)
Row(_c0=u'Maserati Bora', features=DenseVector([15.0, 8.0, 301.0, 335.0, 3.54, 3.57]), prediction=0)
======================================================
Cluster 1
Row(_c0=u'Fiat 128', features=DenseVector([32.4, 4.0, 78.7, 66.0, 4.08, 2.2]), prediction=1)
Row(_c0=u'Honda Civic', features=DenseVector([30.4, 4.0, 75.7, 52.0, 4.93, 1.615]), prediction=1)
Row(_c0=u'Toyota Corolla', features=DenseVector([33.9, 4.0, 71.1, 65.0, 4.22, 1.835]), prediction=1)
Row(_c0=u'Fiat X1-9', features=DenseVector([27.3, 4.0, 79.0, 66.0, 4.08, 1.935]), prediction=1)
Row(_c0=u'Porsche 914-2', features=DenseVector([26.0, 4.0, 120.3, 91.0, 4.43, 2.14]), prediction=1)
Row(_c0=u'Lotus Europa', features=DenseVector([30.4, 4.0, 95.1, 113.0, 3.77, 1.513]), prediction=1)
======================================================
Cluster 2
Row(_c0=u'Hornet 4 Drive', features=DenseVector([21.4, 6.0, 258.0, 110.0, 3.08, 3.215]), prediction=2)
Row(_c0=u'Valiant', features=DenseVector([18.1, 6.0, 225.0, 105.0, 2.76, 3.46]), prediction=2)
Row(_c0=u'Ferrari Dino', features=DenseVector([19.7, 6.0, 145.0, 175.0, 3.62, 2.77]), prediction=2)
======================================================
Cluster 3
Row(_c0=u'Mazda RX4', features=DenseVector([21.0, 6.0, 160.0, 110.0, 3.9, 2.62]), prediction=3)
Row(_c0=u'Mazda RX4 Wag', features=DenseVector([21.0, 6.0, 160.0, 110.0, 3.9, 2.875]), prediction=3)
Row(_c0=u'Datsun 710', features=DenseVector([22.8, 4.0, 108.0, 93.0, 3.85, 2.32]), prediction=3)
Row(_c0=u'Merc 240D', features=DenseVector([24.4, 4.0, 146.7, 62.0, 3.69, 3.19]), prediction=3)
Row(_c0=u'Merc 230', features=DenseVector([22.8, 4.0, 140.8, 95.0, 3.92, 3.15]), prediction=3)
Row(_c0=u'Merc 280', features=DenseVector([19.2, 6.0, 167.6, 123.0, 3.92, 3.44]), prediction=3)
Row(_c0=u'Merc 280C', features=DenseVector([17.8, 6.0, 167.6, 123.0, 3.92, 3.44]), prediction=3)
Row(_c0=u'Toyota Corona', features=DenseVector([21.5, 4.0, 120.1, 97.0, 3.7, 2.465]), prediction=3)
Row(_c0=u'Volvo 142E', features=DenseVector([21.4, 4.0, 121.0, 109.0, 4.11, 2.78]), prediction=3)
======================================================
Cluster 4
Row(_c0=u'Cadillac Fleetwood', features=DenseVector([10.4, 8.0, 472.0, 205.0, 2.93, 5.25]), prediction=4)
Row(_c0=u'Lincoln Continental', features=DenseVector([10.4, 8.0, 460.0, 215.0, 3.0, 5.424]), prediction=4)
Row(_c0=u'Chrysler Imperial', features=DenseVector([14.7, 8.0, 440.0, 230.0, 3.23, 5.345]), prediction=4)
======================================================

In [9]:
ds.toPandas()


Out[9]:
_c0 mpg cyl disp hp drat wt qsec vs am gear carb
0 Mazda RX4 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4
1 Mazda RX4 Wag 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4
2 Datsun 710 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1
3 Hornet 4 Drive 21.4 6 258.0 110 3.08 3.215 19.44 1 0 3 1
4 Hornet Sportabout 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2
5 Valiant 18.1 6 225.0 105 2.76 3.460 20.22 1 0 3 1
6 Duster 360 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4
7 Merc 240D 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2
8 Merc 230 22.8 4 140.8 95 3.92 3.150 22.90 1 0 4 2
9 Merc 280 19.2 6 167.6 123 3.92 3.440 18.30 1 0 4 4
10 Merc 280C 17.8 6 167.6 123 3.92 3.440 18.90 1 0 4 4
11 Merc 450SE 16.4 8 275.8 180 3.07 4.070 17.40 0 0 3 3
12 Merc 450SL 17.3 8 275.8 180 3.07 3.730 17.60 0 0 3 3
13 Merc 450SLC 15.2 8 275.8 180 3.07 3.780 18.00 0 0 3 3
14 Cadillac Fleetwood 10.4 8 472.0 205 2.93 5.250 17.98 0 0 3 4
15 Lincoln Continental 10.4 8 460.0 215 3.00 5.424 17.82 0 0 3 4
16 Chrysler Imperial 14.7 8 440.0 230 3.23 5.345 17.42 0 0 3 4
17 Fiat 128 32.4 4 78.7 66 4.08 2.200 19.47 1 1 4 1
18 Honda Civic 30.4 4 75.7 52 4.93 1.615 18.52 1 1 4 2
19 Toyota Corolla 33.9 4 71.1 65 4.22 1.835 19.90 1 1 4 1
20 Toyota Corona 21.5 4 120.1 97 3.70 2.465 20.01 1 0 3 1
21 Dodge Challenger 15.5 8 318.0 150 2.76 3.520 16.87 0 0 3 2
22 AMC Javelin 15.2 8 304.0 150 3.15 3.435 17.30 0 0 3 2
23 Camaro Z28 13.3 8 350.0 245 3.73 3.840 15.41 0 0 3 4
24 Pontiac Firebird 19.2 8 400.0 175 3.08 3.845 17.05 0 0 3 2
25 Fiat X1-9 27.3 4 79.0 66 4.08 1.935 18.90 1 1 4 1
26 Porsche 914-2 26.0 4 120.3 91 4.43 2.140 16.70 0 1 5 2
27 Lotus Europa 30.4 4 95.1 113 3.77 1.513 16.90 1 1 5 2
28 Ford Pantera L 15.8 8 351.0 264 4.22 3.170 14.50 0 1 5 4
29 Ferrari Dino 19.7 6 145.0 175 3.62 2.770 15.50 0 1 5 6
30 Maserati Bora 15.0 8 301.0 335 3.54 3.570 14.60 0 1 5 8
31 Volvo 142E 21.4 4 121.0 109 4.11 2.780 18.60 1 1 4 2

In [14]:
# predict.groupBy("prediction", "model").count().toPandas()
predict.createOrReplaceTempView("mypredict")
# result = sqlContext.sql("select id,concat_ws(';', collect_list(value)),concat_ws(';', collect_list(value)) from data group by id")
predgroups = spark.sql("select prediction, concat_ws(',', collect_list(_c0)) AS models, count(*) FROM mypredict GROUP BY prediction ORDER BY count(*) DESC ")
# predgroups.show(100, False)

In [11]:
spark.sql("DROP TABLE default.gorto")
spark.sql("CREATE TABLE default.gorto AS select _c0 AS model, mpg, cyl, disp, hp, drat, wt, qsec, vs, am, gear, carb, prediction FROM mypredict").toPandas();