1. Pyspark cluster Demo


In [14]:
from pyspark import SparkConf, SparkContext
## set up spark context
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
## set up  SparkSession
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [15]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors #for above 2.0 version
#from pyspark.mllib.linalg import Vectors
  • Creat dataset

In [16]:
# load data
data = [(Vectors.dense([0.0, 0.0]),), \
        (Vectors.dense([1.0, 1.0]),),\
        (Vectors.dense([9.0, 8.0]),), \
        (Vectors.dense([8.0, 9.0]),)]
df = sqlContext.createDataFrame(data, ["features"])
df.show()


+---------+
| features|
+---------+
|[0.0,0.0]|
|[1.0,1.0]|
|[9.0,8.0]|
|[8.0,9.0]|
+---------+


In [17]:
#kmeans model
kmeans = KMeans(k=2, seed=1)
Kmodel = kmeans.fit(df)
# number of cenet
centers = Kmodel.clusterCenters()
len(centers)


Out[17]:
2

In [18]:
transformed = Kmodel.transform(df).select("features", "prediction")
rows = transformed.collect()
rows


Out[18]:
[Row(features=DenseVector([0.0, 0.0]), prediction=1),
 Row(features=DenseVector([1.0, 1.0]), prediction=1),
 Row(features=DenseVector([9.0, 8.0]), prediction=0),
 Row(features=DenseVector([8.0, 9.0]), prediction=0)]

In [19]:
rows[0].prediction == rows[1].prediction
#True
rows[2].prediction == rows[3].prediction
# True


Out[19]:
True

2. Pyspark cluster for iris dataset


In [20]:
from pyspark.ml.clustering import KMeans
from pyspark.sql import SQLContext
from pyspark.ml.linalg import Vectors

In [21]:
from pyspark import SparkContext
#sc = SparkContext()
sqlContext = SQLContext(sc)
  • Load data

In [22]:
data = sqlContext.read.format('com.databricks.spark.csv').\
                               options(header='true', \
                               inferschema='true').load('./data/iris.csv')

In [23]:
data.printSchema()
data.show()


root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- species: string (nullable = true)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
|         5.4|        3.9|         1.7|        0.4| setosa|
|         4.6|        3.4|         1.4|        0.3| setosa|
|         5.0|        3.4|         1.5|        0.2| setosa|
|         4.4|        2.9|         1.4|        0.2| setosa|
|         4.9|        3.1|         1.5|        0.1| setosa|
|         5.4|        3.7|         1.5|        0.2| setosa|
|         4.8|        3.4|         1.6|        0.2| setosa|
|         4.8|        3.0|         1.4|        0.1| setosa|
|         4.3|        3.0|         1.1|        0.1| setosa|
|         5.8|        4.0|         1.2|        0.2| setosa|
|         5.7|        4.4|         1.5|        0.4| setosa|
|         5.4|        3.9|         1.3|        0.4| setosa|
|         5.1|        3.5|         1.4|        0.3| setosa|
|         5.7|        3.8|         1.7|        0.3| setosa|
|         5.1|        3.8|         1.5|        0.3| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 20 rows

  • Convert string data to numerical data

In [24]:
from pyspark.ml.feature import StringIndexer
feature = StringIndexer(inputCol="species", outputCol="targetlabel")
target = feature.fit(data).transform(data)

In [25]:
target.printSchema()
target.show()


root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- species: string (nullable = true)
 |-- targetlabel: double (nullable = true)

+------------+-----------+------------+-----------+-------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|species|targetlabel|
+------------+-----------+------------+-----------+-------+-----------+
|         5.1|        3.5|         1.4|        0.2| setosa|        2.0|
|         4.9|        3.0|         1.4|        0.2| setosa|        2.0|
|         4.7|        3.2|         1.3|        0.2| setosa|        2.0|
|         4.6|        3.1|         1.5|        0.2| setosa|        2.0|
|         5.0|        3.6|         1.4|        0.2| setosa|        2.0|
|         5.4|        3.9|         1.7|        0.4| setosa|        2.0|
|         4.6|        3.4|         1.4|        0.3| setosa|        2.0|
|         5.0|        3.4|         1.5|        0.2| setosa|        2.0|
|         4.4|        2.9|         1.4|        0.2| setosa|        2.0|
|         4.9|        3.1|         1.5|        0.1| setosa|        2.0|
|         5.4|        3.7|         1.5|        0.2| setosa|        2.0|
|         4.8|        3.4|         1.6|        0.2| setosa|        2.0|
|         4.8|        3.0|         1.4|        0.1| setosa|        2.0|
|         4.3|        3.0|         1.1|        0.1| setosa|        2.0|
|         5.8|        4.0|         1.2|        0.2| setosa|        2.0|
|         5.7|        4.4|         1.5|        0.4| setosa|        2.0|
|         5.4|        3.9|         1.3|        0.4| setosa|        2.0|
|         5.1|        3.5|         1.4|        0.3| setosa|        2.0|
|         5.7|        3.8|         1.7|        0.3| setosa|        2.0|
|         5.1|        3.8|         1.5|        0.3| setosa|        2.0|
+------------+-----------+------------+-----------+-------+-----------+
only showing top 20 rows


In [26]:
from pyspark.sql import Row
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors

In [27]:
# convert the data to dense vector
def transData(row):
    return Row(label=row["targetlabel"],
               features=Vectors.dense([row["sepal_length"],
                                       row["sepal_width"],
                                       row["petal_length"],
                                       row["petal_width"]]))

In [28]:
#convert the data to Dataframe
#Note: if your pyspark is 2.0 above, you need to convert Datafrme to rdd FIRST
transformed = target.rdd.map(transData).toDF() 
transformed.show()


+-----------------+-----+
|         features|label|
+-----------------+-----+
|[5.1,3.5,1.4,0.2]|  2.0|
|[4.9,3.0,1.4,0.2]|  2.0|
|[4.7,3.2,1.3,0.2]|  2.0|
|[4.6,3.1,1.5,0.2]|  2.0|
|[5.0,3.6,1.4,0.2]|  2.0|
|[5.4,3.9,1.7,0.4]|  2.0|
|[4.6,3.4,1.4,0.3]|  2.0|
|[5.0,3.4,1.5,0.2]|  2.0|
|[4.4,2.9,1.4,0.2]|  2.0|
|[4.9,3.1,1.5,0.1]|  2.0|
|[5.4,3.7,1.5,0.2]|  2.0|
|[4.8,3.4,1.6,0.2]|  2.0|
|[4.8,3.0,1.4,0.1]|  2.0|
|[4.3,3.0,1.1,0.1]|  2.0|
|[5.8,4.0,1.2,0.2]|  2.0|
|[5.7,4.4,1.5,0.4]|  2.0|
|[5.4,3.9,1.3,0.4]|  2.0|
|[5.1,3.5,1.4,0.3]|  2.0|
|[5.7,3.8,1.7,0.3]|  2.0|
|[5.1,3.8,1.5,0.3]|  2.0|
+-----------------+-----+
only showing top 20 rows

Fit Kmeans model


In [29]:
# model and predict data
kmeans = KMeans(k=3)
model = kmeans.fit(transformed) 
predict_data = model.transform(transformed)
predict_data.show()


+-----------------+-----+----------+
|         features|label|prediction|
+-----------------+-----+----------+
|[5.1,3.5,1.4,0.2]|  2.0|         0|
|[4.9,3.0,1.4,0.2]|  2.0|         0|
|[4.7,3.2,1.3,0.2]|  2.0|         0|
|[4.6,3.1,1.5,0.2]|  2.0|         0|
|[5.0,3.6,1.4,0.2]|  2.0|         0|
|[5.4,3.9,1.7,0.4]|  2.0|         0|
|[4.6,3.4,1.4,0.3]|  2.0|         0|
|[5.0,3.4,1.5,0.2]|  2.0|         0|
|[4.4,2.9,1.4,0.2]|  2.0|         0|
|[4.9,3.1,1.5,0.1]|  2.0|         0|
|[5.4,3.7,1.5,0.2]|  2.0|         0|
|[4.8,3.4,1.6,0.2]|  2.0|         0|
|[4.8,3.0,1.4,0.1]|  2.0|         0|
|[4.3,3.0,1.1,0.1]|  2.0|         0|
|[5.8,4.0,1.2,0.2]|  2.0|         0|
|[5.7,4.4,1.5,0.4]|  2.0|         0|
|[5.4,3.9,1.3,0.4]|  2.0|         0|
|[5.1,3.5,1.4,0.3]|  2.0|         0|
|[5.7,3.8,1.7,0.3]|  2.0|         0|
|[5.1,3.8,1.5,0.3]|  2.0|         0|
+-----------------+-----+----------+
only showing top 20 rows


In [30]:
train_err = predict_data.filter(predict_data['label'] != predict_data['prediction']).count() 
total = predict_data.count()

In [31]:
train_err, total, 1-float(train_err)/total,float(train_err)/total


Out[31]:
(136, 150, 0.09333333333333338, 0.9066666666666666)

In [ ]: