K-means in PySpark

The next machine learning method I'd like to introduce is about clustering, K-means. It is an unsupervised learning method where we would like to group the observations into K groups (or subsets). We call it "unsupervised" since we don't have associated response measurements together with the observations to help check and evaluate the model we built (of course we can use other measures to evaluate the clustering models).

K-means may be the simplest approach for clustering while it’s also an elegant and efficient method. To produce the clusters, K-means method only requires the number of clusters K as its input.

The idea of K-means clustering is that a good clustering is with the smallest within-cluster variation (a measurement of how different the observations within a cluster are from each other) in a possible range. To achieve this purpose, K-means algorithm is designed in a "greedy" algorithm fashion

K-means Algorithm

1. For each observation, assign a random number which is generated from 1 to *K* to it.

2. For each of the *K* clusters, compute the cluster center. The *k*th cluster’s center is the vector of the means of the vectors of all the observations belonging to the kth cluster.

3. Re-assign each observation to the cluster whose cluster center is closest to this observation.

4. Check if the new assignments are the same as the last iteration. If not, go to step 2; if yes, END.


An example of iteration with K-means algorithm is presented below

Now it's time to implement K-means with PySpark. I generate a dateset myself, it contains 30 observations, and I purposedly "made" them group 3 sets.

Dependencies


In [11]:
# from pyspark import SparkContext
from pyspark.mllib.clustering import KMeans, KMeansModel

# http://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html#pyspark.mllib.classification.NaiveBayesModel
from pyspark.mllib.classification import NaiveBayesModel

# http://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html#pyspark.mllib.evaluation.RankingMetrics
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics, RankingMetrics
import numpy as np
import pandas as pd
from random import randrange
from math import sqrt

In [12]:
!ls -l


total 518108
drwxrwxr-x 20 ec2-user ec2-user      4096 Jun 28 17:43 anaconda3
-rw-rw-r--  1 ec2-user ec2-user 523283080 May 30 19:24 Anaconda3-4.4.0-Linux-x86_64.sh
-rw-------  1 ec2-user ec2-user       678 Jun 28 17:53 aws_sf17ds6.pem
-rw-rw-r--  1 ec2-user ec2-user       674 Jun 28 18:31 derby.log
-rw-rw-r--  1 ec2-user ec2-user       460 Jun 28 18:30 jupyter-setup.sh
drwxrwxr-x  5 ec2-user ec2-user      4096 Jun 28 18:31 metastore_db
drwxrwxr-x 13 ec2-user ec2-user      4096 Jun 28 17:39 spark
-rw-r--r--  1 ec2-user ec2-user     12986 Jun 28 18:35 spark-play.ipynb
-rw-r--r--  1 ec2-user ec2-user   7211502 Jun 28 18:00 transactions.csv

In [2]:
sc


Out[2]:
<pyspark.context.SparkContext at 0x7fde8c5ac4e0>

In [5]:
# sc = SparkContext("local", "Simple App")

In [6]:
# sc.stop(sc)
# sc.getOrCreate("local", "Simple App")

KMeans Fake Data


In [7]:
# Generate the observations -----------------------------------------------------
n_in_each_group = 10   # how many observations in each group
n_of_feature = 5 # how many features we have for each observation

In [8]:
observation_group_1=[]
for i in range(n_in_each_group*n_of_feature):
	observation_group_1.append(randrange(5, 8))

observation_group_2=[]
for i in range(n_in_each_group*n_of_feature):
	observation_group_2.append(randrange(55, 58))

observation_group_3=[]
for i in range(n_in_each_group*n_of_feature):
	observation_group_3.append(randrange(105, 108))

In [9]:
data = np.array([observation_group_1, observation_group_2, observation_group_3]).reshape(n_in_each_group*3, 5)
data = sc.parallelize(data)

In [ ]:

Run the K-Means algorithm -----------------------------------------------------


In [13]:
# Build the K-Means model
# the initializationMode can also be "k-means||" or set by users.
clusters = KMeans.train(data, 3, maxIterations=10, initializationMode="random")


---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-13-34da217693ea> in <module>()
      1 # Build the K-Means model
      2 # the initializationMode can also be "k-means||" or set by users.
----> 3 clusters = KMeans.train(data, 3, maxIterations=10, initializationMode="random")

/home/ec2-user/spark/python/pyspark/mllib/clustering.py in train(cls, rdd, k, maxIterations, runs, initializationMode, seed, initializationSteps, epsilon, initialModel)
    354         model = callMLlibFunc("trainKMeansModel", rdd.map(_convert_to_vector), k, maxIterations,
    355                               runs, initializationMode, seed, initializationSteps, epsilon,
--> 356                               clusterInitialModel)
    357         centers = callJavaFunc(rdd.context, model.clusterCenters)
    358         return KMeansModel([c.toArray() for c in centers])

/home/ec2-user/spark/python/pyspark/mllib/common.py in callMLlibFunc(name, *args)
    128     sc = SparkContext.getOrCreate()
    129     api = getattr(sc._jvm.PythonMLLibAPI(), name)
--> 130     return callJavaFunc(sc, api, *args)
    131 
    132 

/home/ec2-user/spark/python/pyspark/mllib/common.py in callJavaFunc(sc, func, *args)
    121     """ Call Java Function """
    122     args = [_py2java(sc, a) for a in args]
--> 123     return _java2py(sc, func(*args))
    124 
    125 

/home/ec2-user/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/home/ec2-user/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/home/ec2-user/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling o35.trainKMeansModel.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 0.0 failed 4 times, most recent failure: Lost task 4.3 in stage 0.0 (TID 26, 172.31.43.251, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/worker.py", line 125, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.6, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
	at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1158)
	at org.apache.spark.rdd.RDD$$anonfun$takeSample$1.apply(RDD.scala:568)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.takeSample(RDD.scala:557)
	at org.apache.spark.mllib.clustering.KMeans.initRandom(KMeans.scala:334)
	at org.apache.spark.mllib.clustering.KMeans.runAlgorithm(KMeans.scala:254)
	at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:227)
	at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:209)
	at org.apache.spark.mllib.api.python.PythonMLLibAPI.trainKMeansModel(PythonMLLibAPI.scala:367)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/worker.py", line 125, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.6, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
	at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more

In [ ]:
# Collect the clustering result
result=data.map(lambda point: clusters.predict(point)).collect()
print(result)

In [ ]:
# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

WSSSE = data.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))

In [ ]:
# Create a Vertex DataFrame with unique ID column "id"
v = sqlContext.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
], ["id", "name", "age"])
# Create an Edge DataFrame with "src" and "dst" columns
e = sqlContext.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
], ["src", "dst", "relationship"])
# Create a GraphFrame
from graphframes import *
g = GraphFrame(v, e)

# Query: Get in-degree of each vertex.
g.inDegrees.show()

# Query: Count the number of "follow" connections in the graph.
g.edges.filter("relationship = 'follow'").count()

# Run PageRank algorithm, and show results.
results = g.pageRank(resetProbability=0.01, maxIter=20)
results.vertices.select("id", "pagerank").show()

In [ ]:

Initial Classification - tip from guo


In [ ]:
data = [LabeledPoint(0.0, [0.0, 0.0]),
        LabeledPoint(0.0, [0.0, 1.0]),
        LabeledPoint(1.0, [1.0, 0.0])]

model = NaiveBayes.train(sc.parallelize(data))
model.predict(array([0.0, 1.0]))
model.predict(array([1.0, 0.0]))

model.predict(sc.parallelize([[1.0, 0.0]])).collect()

sparse_data = [LabeledPoint(0.0, SparseVector(2, {1: 0.0})),
               LabeledPoint(0.0, SparseVector(2, {1: 1.0})),
               LabeledPoint(1.0, SparseVector(2, {0: 1.0}))]

model = NaiveBayes.train(sc.parallelize(sparse_data))
model.predict(SparseVector(2, {1: 1.0}))

model.predict(SparseVector(2, {0: 1.0}))

In [ ]:
import os, tempfile
path = tempfile.mkdtemp()
model.save(sc, path)
sameModel = NaiveBayesModel.load(sc, path)
sameModel.predict(SparseVector(2, {0: 1.0})) == model.predict(SparseVector(2, {0: 1.0}))
# True

In [ ]:
from shutil import rmtree
try:
    rmtree(path)
    except OSError:
        pass

In [ ]: