K-means in PySpark

The next machine learning method I'd like to introduce is about clustering, K-means. It is an unsupervised learning method where we would like to group the observations into K groups (or subsets). We call it "unsupervised" since we don't have associated response measurements together with the observations to help check and evaluate the model we built (of course we can use other measures to evaluate the clustering models).

K-means may be the simplest approach for clustering while it’s also an elegant and efficient method. To produce the clusters, K-means method only requires the number of clusters K as its input.

The idea of K-means clustering is that a good clustering is with the smallest within-cluster variation (a measurement of how different the observations within a cluster are from each other) in a possible range. To achieve this purpose, K-means algorithm is designed in a "greedy" algorithm fashion

K-means Algorithm

1. For each observation, assign a random number which is generated from 1 to *K* to it.

2. For each of the *K* clusters, compute the cluster center. The *k*th cluster’s center is the vector of the means of the vectors of all the observations belonging to the kth cluster.

3. Re-assign each observation to the cluster whose cluster center is closest to this observation.

4. Check if the new assignments are the same as the last iteration. If not, go to step 2; if yes, END.


An example of iteration with K-means algorithm is presented below

Now it's time to implement K-means with PySpark. I generate a dateset myself, it contains 30 observations, and I purposedly "made" them group 3 sets.

Dependencies


In [1]:
from __future__ import print_function

# from pyspark import SparkContext, SparkConf
# from pyspark.mllib.clustering import KMeans, KMeansModel

# # http://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html#pyspark.mllib.evaluation.RankingMetrics
# from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics, RankingMetrics
import numpy as np
from numpy import array
import pandas as pd

from random import randrange
from math import sqrt

In [2]:
!ls -l


total 519184
drwxrwxr-x 20 ec2-user ec2-user      4096 Jun 28 17:43 anaconda3
-rw-rw-r--  1 ec2-user ec2-user 523283080 May 30 19:24 Anaconda3-4.4.0-Linux-x86_64.sh
-rw-------  1 ec2-user ec2-user       678 Jun 28 17:53 aws_sf17ds6.pem
-rw-rw-r--  1 ec2-user ec2-user       675 Jun 29 03:33 derby.log
-rw-rw-r--  1 ec2-user ec2-user       503 Jun 29 03:32 jupyter-setup.sh
drwxrwxr-x  5 ec2-user ec2-user      4096 Jun 29 03:33 metastore_db
drwxrwxr-x  4 ec2-user ec2-user      4096 Jun 29 01:41 PICModel
drwxrwxr-x 13 ec2-user ec2-user      4096 Jun 28 17:39 spark
-rw-r--r--  1 ec2-user ec2-user     42774 Jun 28 18:50 spark-play.ipynb
-rw-rw-r--  1 ec2-user ec2-user    111400 Jun 29 03:30 spark-play-py27.ipynb
-rw-rw-r--  1 ec2-user ec2-user    473143 Jun 29 01:36 tmp-txns-no-headers.txt
-rw-rw-r--  1 ec2-user ec2-user    473183 Jun 29 01:17 tmp_txns.txt.1
-rw-r--r--  1 ec2-user ec2-user   7211502 Jun 28 18:00 transactions.csv
-rw-rw-r--  1 ec2-user ec2-user       607 Jun 28 18:58 trial-py27.ipynb

In [3]:
sc


Out[3]:
<pyspark.context.SparkContext at 0x7f863a87a890>

In [4]:
conf = SparkConf()

In [5]:
# sc = SparkContext("local", "Simple App")

In [6]:
# sc.stop(sc)
# sc.getOrCreate("local", "Simple App")

Get Numpy on the cluster

  • required for Kmeans

from local machine:

# install dependencies on the cluster ==================================
flintrock run-command test-cluster 'sudo yum install -y gcc'
# have faith...flintrock takes awhile (3-5 min)
flintrock run-command test-cluster 'pip install --user numpy'

note to self: learn how to run bash scripts

Real Transaction Data

GraphX in PySpark -----------------------------------------------------

Real Transaction Data


In [1]:
transactions = pd.read_csv('transactions.csv')
transactions['Date'] = pd.to_datetime(transactions['Date'],unit='ms') #coerce date format
transactions[:3]


---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-1-a502134dd601> in <module>()
----> 1 transactions = pd.read_csv('transactions.csv')
      2 transactions['Date'] = pd.to_datetime(transactions['Date'],unit='ms') #coerce date format
      3 transactions[:3]

NameError: name 'pd' is not defined

In [8]:
print('transactions columns: ', list(transactions.columns))


transactions columns:  ['Amount $', 'Date', 'Destination', 'Source', 'Transaction ID', 'isTainted']

DataFrame --> GraphFrame

from: http://www.datareply.co.uk/blog/2016/9/20/running-graph-analytics-with-spark-graphframes-a-simple-example

Loading the Data - Nodes

We will use the pandas package for loading the dataset as a dataframe.

  • Though not explicitly necessary in this case, the code I am using below accounts for cases where the file size is too big to be loaded in one go and performs the loading operation in chunks.
  • We first load the genes with the following code.
  • Notice that since the .csv file contains relationships, genes under the OFFICIAL_SYMBOL_A header may appear multiple times and thus duplicates need to be removed for unique identifiers
  • Make sure to name the dataframe's single column that we are importing with the header "id" otherwise you will get an exception when you try to instantiate the GraphFrame based on it.

In [9]:
#establish sql context
from pyspark.sql import SQLContext

# Instantiate SQL_SparkContext object
SQL_CONTEXT = SQLContext(sc)

In [10]:
nodes = pd.read_csv('transactions.csv',
                    usecols=['Source', 'isTainted'],
                    low_memory=True,
                    iterator=True,
                    chunksize=1000)

In [11]:
# Concatenate chunks into list & convert to DataFrame
nodes = pd.DataFrame(pd.concat(list(nodes), ignore_index=True))

In [12]:
# Create a Vertex DataFrame with unique ID column "id"
nodes.columns = ['id', 'isTainted']
print('nodes columns: ', list(nodes.columns))


nodes columns:  ['id', 'isTainted']

In [13]:
NODES = SQL_CONTEXT.createDataFrame(nodes)
NODES.take(3)


Out[13]:
[Row(id=u'2dd13954e18508bb8b3a41d96a022be9...', isTainted=0),
 Row(id=u'7c74d3afb41e536e26948a1d2455a7c7...', isTainted=0),
 Row(id=u'50dced19b8ee41114916bf3ca894f455...', isTainted=0)]

In [14]:
# Parallelize -----------------------------------------------------
# VERTICES = sc.parallelize(nodes)

In [15]:
# data = array([observation_group_1, observation_group_2, observation_group_3]).reshape(n_in_each_group*3, 5)
# data

In [16]:
# data = sc.parallelize(data)

In [17]:
# data.getNumPartitions()

In [18]:
# # Generate the observations -----------------------------------------------------
# n_in_each_group = 10   # how many observations in each group
# n_of_feature = 5 # how many features we have for each observation

# observation_group_1=[]
# for i in range(n_in_each_group*n_of_feature):
# 	observation_group_1.append(randrange(5, 8))

# observation_group_2=[]
# for i in range(n_in_each_group*n_of_feature):
# 	observation_group_2.append(randrange(55, 58))

# observation_group_3=[]
# for i in range(n_in_each_group*n_of_feature):
# 	observation_group_3.append(randrange(105, 108))

In [19]:
# del GENES_DF_CLEAN, GENES_DF, GENES

Loading the Data - Edges

We repeat the same process as before with the difference that now we will be importing all three columns. If I am to be strict, I should say that this step is not really necessary if one opts for loading everything in one go the first time, then SELECTing different column-dataframes thus efficiently differentiating between vertices and edges. Still, I am repeating the code here for presentation convenience. Also, even though it is not really necessary in our case, you should make sure that once you are done with the loading process you get rid of anything that should not be kept in the memory any more, e.g. run:

spark
del GENES_DF_CLEAN, GENES_DF, GENES

Side note: The del statement does not directly reclaim memory. It only, removes a reference, which decrements the reference count on the value and if that count is zero, the memory can be reclaimed. In general, CPython will reclaim the memory immediately, there's no need to wait for the garbage collector to run.

Here is the code for loading the edges:


In [20]:
edges = pd.read_csv('transactions.csv',
                    usecols=['Amount $', 'Date', 'Destination', 'Source', 'Transaction ID', 'isTainted'],
                    low_memory=True,
                    iterator=True,
                    chunksize=1000)

In [21]:
# Concatenate chunks into list & convert to DataFrame
edges = pd.DataFrame(pd.concat(list(edges), ignore_index=True))

In [22]:
cols = ['Source', 'Destination', 'isTainted', 'Amount $', 'Date', 'Transaction ID']

In [23]:
edges = edges[cols]
print('edges columns: ', list(edges.columns))


edges columns:  ['Source', 'Destination', 'isTainted', 'Amount $', 'Date', 'Transaction ID']

In [24]:
# Create an Edge DataFrame with "src" and "dst" columns
edges.columns = ["src", "dst", "relationship", 'Amount $', 'Date', 'TxID']
print('edges columns: ', list(edges.columns))


edges columns:  ['src', 'dst', 'relationship', 'Amount $', 'Date', 'TxID']

In [25]:
EDGES = SQL_CONTEXT.createDataFrame(edges)
EDGES.take(3)


Out[25]:
[Row(src=u'2dd13954e18508bb8b3a41d96a022be9...', dst=u'84a0b53e1ac008b8dd0fd6212d4b7fa2...', relationship=0, Amount $=3223.9752, Date=1385240000000.0, TxID=u'b6eb8ba20df31fa74fbe7755f58c18f82a599d6bb5fa79ef670da27e793a25fd'),
 Row(src=u'7c74d3afb41e536e26948a1d2455a7c7...', dst=u'3b62a891b99969042d4e6ac8158d0a18...', relationship=0, Amount $=3708.0216, Date=1401500000000.0, TxID=u'60df3c67063e136a0c9715edcd12ae717e6f9ed492afe2140294a1f283ddfa03'),
 Row(src=u'50dced19b8ee41114916bf3ca894f455...', dst=u'3b62a891b99969042d4e6ac8158d0a18...', relationship=0, Amount $=2.48, Date=1398560000000.0, TxID=u'a6aafd3d85600844536b8a5f2c255686c33dc4969e68a45ca0978e2f00977322')]

Make the graph


In [26]:
from graphframes import *

In [27]:
type(NODES), type(EDGES)


Out[27]:
(pyspark.sql.dataframe.DataFrame, pyspark.sql.dataframe.DataFrame)

In [28]:
# Next we finally create the graph:
g = GraphFrame(NODES, EDGES)

Graph Analytics

Descriptive Statistics


In [29]:
# Query: Count the number of "isTainted" connections in the graph.
print(g.vertices.count())
print(g.edges.count())
print(g.degrees.count())
print(g.vertices.filter("isTainted = 5").count())


45117
45117
28832
543

Put a feature in pandas - put it back into pandas if resorting to sklearn


In [30]:
# Query: Get in-degree of each vertex.
print("Vertex in-Degree -----------------------------------------------------------------------")
df = g.inDegrees.sort('inDegree', ascending=False).toPandas()
transactions = transactions.merge(df,
                                  left_on='Source',
                                  right_on='id',)
transactions.head()


Vertex in-Degree -----------------------------------------------------------------------
Out[30]:
Amount $ Date Destination Source Transaction ID isTainted id inDegree
0 3223.9752 2013-11-23 20:53:20 84a0b53e1ac008b8dd0fd6212d4b7fa2... 2dd13954e18508bb8b3a41d96a022be9... b6eb8ba20df31fa74fbe7755f58c18f82a599d6bb5fa79... 0 2dd13954e18508bb8b3a41d96a022be9... 1
1 1984.0000 2013-11-21 07:46:40 c283daf07c2c146fa8947d7306279cb0... 2dd13954e18508bb8b3a41d96a022be9... 8ce957cb97826ddff443380934cb7dd297ac2684177da4... 0 2dd13954e18508bb8b3a41d96a022be9... 1
2 4094.7032 2013-11-09 18:00:00 9550de4ed8a7b13a5b5252061cdf9b63... 2dd13954e18508bb8b3a41d96a022be9... df07a3749ee5bae84817079f7a4b4cf2fcdfe0924c6526... 0 2dd13954e18508bb8b3a41d96a022be9... 1
3 3719.9752 2013-12-09 03:33:20 d0770b7769567b0d0c822203f5858689... 2dd13954e18508bb8b3a41d96a022be9... de8095f19d32a58f2395c9a62b75ec0427fc83a635f953... 0 2dd13954e18508bb8b3a41d96a022be9... 1
4 5208.0000 2014-01-03 11:53:20 09f3142d9c75be2d1fe94e9471186547... 2dd13954e18508bb8b3a41d96a022be9... a240e4c369fe4c8c6874cd9c5e5fc4113a8bc206f8bda6... 0 2dd13954e18508bb8b3a41d96a022be9... 1

In [31]:
print("Vertex out-Degree ----------------------------------------------------------------------")
df = g.outDegrees.sort('outDegree', ascending=False).toPandas()
transactions = transactions.merge(df,
                                  left_on='Source',
                                  right_on='id')
transactions.head()


Vertex out-Degree ----------------------------------------------------------------------
Out[31]:
Amount $ Date Destination Source Transaction ID isTainted id_x inDegree id_y outDegree
0 3223.9752 2013-11-23 20:53:20 84a0b53e1ac008b8dd0fd6212d4b7fa2... 2dd13954e18508bb8b3a41d96a022be9... b6eb8ba20df31fa74fbe7755f58c18f82a599d6bb5fa79... 0 2dd13954e18508bb8b3a41d96a022be9... 1 2dd13954e18508bb8b3a41d96a022be9... 16
1 1984.0000 2013-11-21 07:46:40 c283daf07c2c146fa8947d7306279cb0... 2dd13954e18508bb8b3a41d96a022be9... 8ce957cb97826ddff443380934cb7dd297ac2684177da4... 0 2dd13954e18508bb8b3a41d96a022be9... 1 2dd13954e18508bb8b3a41d96a022be9... 16
2 4094.7032 2013-11-09 18:00:00 9550de4ed8a7b13a5b5252061cdf9b63... 2dd13954e18508bb8b3a41d96a022be9... df07a3749ee5bae84817079f7a4b4cf2fcdfe0924c6526... 0 2dd13954e18508bb8b3a41d96a022be9... 1 2dd13954e18508bb8b3a41d96a022be9... 16
3 3719.9752 2013-12-09 03:33:20 d0770b7769567b0d0c822203f5858689... 2dd13954e18508bb8b3a41d96a022be9... de8095f19d32a58f2395c9a62b75ec0427fc83a635f953... 0 2dd13954e18508bb8b3a41d96a022be9... 1 2dd13954e18508bb8b3a41d96a022be9... 16
4 5208.0000 2014-01-03 11:53:20 09f3142d9c75be2d1fe94e9471186547... 2dd13954e18508bb8b3a41d96a022be9... a240e4c369fe4c8c6874cd9c5e5fc4113a8bc206f8bda6... 0 2dd13954e18508bb8b3a41d96a022be9... 1 2dd13954e18508bb8b3a41d96a022be9... 16

In [32]:
print("Vertex degree --------------------------------------------------------------------------")
df = g.degrees.sort('degree', ascending=False).toPandas()
transactions = transactions.merge(df,
                                  left_on='Source',
                                  right_on='id')
transactions.head()


Vertex degree --------------------------------------------------------------------------
Out[32]:
Amount $ Date Destination Source Transaction ID isTainted id_x inDegree id_y outDegree id degree
0 3223.9752 2013-11-23 20:53:20 84a0b53e1ac008b8dd0fd6212d4b7fa2... 2dd13954e18508bb8b3a41d96a022be9... b6eb8ba20df31fa74fbe7755f58c18f82a599d6bb5fa79... 0 2dd13954e18508bb8b3a41d96a022be9... 1 2dd13954e18508bb8b3a41d96a022be9... 16 2dd13954e18508bb8b3a41d96a022be9... 17
1 1984.0000 2013-11-21 07:46:40 c283daf07c2c146fa8947d7306279cb0... 2dd13954e18508bb8b3a41d96a022be9... 8ce957cb97826ddff443380934cb7dd297ac2684177da4... 0 2dd13954e18508bb8b3a41d96a022be9... 1 2dd13954e18508bb8b3a41d96a022be9... 16 2dd13954e18508bb8b3a41d96a022be9... 17
2 4094.7032 2013-11-09 18:00:00 9550de4ed8a7b13a5b5252061cdf9b63... 2dd13954e18508bb8b3a41d96a022be9... df07a3749ee5bae84817079f7a4b4cf2fcdfe0924c6526... 0 2dd13954e18508bb8b3a41d96a022be9... 1 2dd13954e18508bb8b3a41d96a022be9... 16 2dd13954e18508bb8b3a41d96a022be9... 17
3 3719.9752 2013-12-09 03:33:20 d0770b7769567b0d0c822203f5858689... 2dd13954e18508bb8b3a41d96a022be9... de8095f19d32a58f2395c9a62b75ec0427fc83a635f953... 0 2dd13954e18508bb8b3a41d96a022be9... 1 2dd13954e18508bb8b3a41d96a022be9... 16 2dd13954e18508bb8b3a41d96a022be9... 17
4 5208.0000 2014-01-03 11:53:20 09f3142d9c75be2d1fe94e9471186547... 2dd13954e18508bb8b3a41d96a022be9... a240e4c369fe4c8c6874cd9c5e5fc4113a8bc206f8bda6... 0 2dd13954e18508bb8b3a41d96a022be9... 1 2dd13954e18508bb8b3a41d96a022be9... 16 2dd13954e18508bb8b3a41d96a022be9... 17

In [33]:
transactions = transactions.drop(['id_x', 'id_y', 'id'], axis = 1)

Extra Queries - THIS IS WHERE THINGS BREAK


In [ ]:
# hits no space left on device
print("Triangle Count -------------------------------------------------------------------------")
RESULTS = g.triangleCount()
df = RESULTS.select("id", "count").toPandas()
transactions = transactions.merge(df,
                                  left_on='Source',
                                  right_on='id')
transactions.head()


Triangle Count -------------------------------------------------------------------------

In [ ]:
print("Label Propagation ----------------------------------------------------------------------")
# Convergence is not guaranteed
df = g.labelPropagation(maxIter=10).toPandas()
# transactions = transactions.merge(df,
#                                   left_on='Source',
#                                   right_on='id')

df.head()

There are two implementations of PageRank.

  • The first implementation uses the standalone GraphFrame interface and runs PageRank for a fixed number of iterations. This can be run by setting maxIter.
  • The second implementation uses the org.apache.spark.graphx.Pregel interface and runs PageRank until convergence. This can be run by setting tol. Both implementations support non-personalized and personalized PageRank, where setting a sourceId personalizes the results for that vertex.

In [23]:
# # Run PageRank algorithm the other one, and show results.
# results = g.pageRank(resetProbability=0.01, maxIter=20)
# results.vertices.select("id", "pagerank").show()


+---+-------------------+
| id|           pagerank|
+---+-------------------+
|  b| 0.2808611427228327|
|  a|               0.01|
|  c|0.27995525261339177|
+---+-------------------+


In [ ]:
# Run PageRank algorithm (takes awhile), and show results.
print("PageRank -------------------------------------------------------------------------------")
df = g.pageRank(resetProbability=0.15, tol=0.01)\
  .vertices.sort('pagerank', ascending=False).toPandas()
# transactions = transactions.merge(df,
#                                   left_on='Source',
#                                   right_on='id')

df.head()

In [ ]:
print("Find Shortest Paths w.r.t. Tainted Wallets ---------------------------------------------------")
SHORTEST_PATH = g.shortestPaths(landmarks=["5"])
df = SHORTEST_PATH.select("id", "distances").toPandas()
# transactions = transactions.merge(df,
#                                   left_on='Source',
#                                   right_on='id')
df.head()

In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]:

Cluster like your life depends on it


In [35]:
print('transactions columns: ', list(transactions.columns))


transactions columns:  ['Amount $', 'Date', 'Destination', 'Source', 'Transaction ID', 'isTainted', 'inDegree', 'outDegree', 'degree']

In [36]:
cols = ['inDegree', 'outDegree', 'degree']
tmp_transactions = transactions[cols]

In [37]:
def string_to_int(value):
    try:
        return int(value)
    except ValueError:  
        return None

In [38]:
for column in tmp_transactions.columns:
    tmp_transactions[column] = tmp_transactions[column].apply(string_to_int)


/home/ec2-user/anaconda3/envs/py27/lib/python2.7/site-packages/ipykernel/__main__.py:2: SettingWithCopyWarning: 
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  from ipykernel import kernelapp as app

In [39]:
tmp_transactions.info()


<class 'pandas.core.frame.DataFrame'>
Int64Index: 26050 entries, 0 to 26049
Data columns (total 3 columns):
inDegree     26050 non-null int64
outDegree    26050 non-null int64
degree       26050 non-null int64
dtypes: int64(3)
memory usage: 814.1 KB

In [40]:
data = sc.parallelize(tmp_transactions)

In [41]:
data.getNumPartitions()


Out[41]:
32

In [44]:
# for MultinomialNB classification
tmp_transactions.to_csv('tmp-txns-no-headers.txt', header=False, index = True)

In [45]:
!ls


anaconda3			 metastore_db		tmp-txns-no-headers.txt
Anaconda3-4.4.0-Linux-x86_64.sh  PICModel		tmp_txns.txt
aws_sf17ds6.pem			 spark			tmp_txns.txt.1
derby.log			 spark-play.ipynb	transactions.csv
jupyter-setup.sh		 spark-play-py27.ipynb	trial-py27.ipynb

Run the K-Means algorithm -----------------------------------------------------


In [45]:
# Build the K-Means model
# the initializationMode can also be "k-means||" or set by users.
clusters = KMeans.train(data, 2, maxIterations=3, initializationMode="random")


---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-45-f4ba7ef8193f> in <module>()
      1 # Build the K-Means model
      2 # the initializationMode can also be "k-means||" or set by users.
----> 3 clusters = KMeans.train(data, 2, maxIterations=3, initializationMode="random")

/home/ec2-user/spark/python/pyspark/mllib/clustering.pyc in train(cls, rdd, k, maxIterations, runs, initializationMode, seed, initializationSteps, epsilon, initialModel)
    354         model = callMLlibFunc("trainKMeansModel", rdd.map(_convert_to_vector), k, maxIterations,
    355                               runs, initializationMode, seed, initializationSteps, epsilon,
--> 356                               clusterInitialModel)
    357         centers = callJavaFunc(rdd.context, model.clusterCenters)
    358         return KMeansModel([c.toArray() for c in centers])

/home/ec2-user/spark/python/pyspark/mllib/common.pyc in callMLlibFunc(name, *args)
    128     sc = SparkContext.getOrCreate()
    129     api = getattr(sc._jvm.PythonMLLibAPI(), name)
--> 130     return callJavaFunc(sc, api, *args)
    131 
    132 

/home/ec2-user/spark/python/pyspark/mllib/common.pyc in callJavaFunc(sc, func, *args)
    121     """ Call Java Function """
    122     args = [_py2java(sc, a) for a in args]
--> 123     return _java2py(sc, func(*args))
    124 
    125 

/home/ec2-user/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/home/ec2-user/spark/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/home/ec2-user/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling o206.trainKMeansModel.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 31 in stage 28.0 failed 4 times, most recent failure: Lost task 31.3 in stage 28.0 (TID 1770, 172.31.37.199, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 83, in _convert_to_vector
    raise TypeError("Cannot convert type %s into Vector" % type(l))
TypeError: Cannot convert type <type 'str'> into Vector

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
	at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1158)
	at org.apache.spark.rdd.RDD$$anonfun$takeSample$1.apply(RDD.scala:568)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.takeSample(RDD.scala:557)
	at org.apache.spark.mllib.clustering.KMeans.initRandom(KMeans.scala:334)
	at org.apache.spark.mllib.clustering.KMeans.runAlgorithm(KMeans.scala:254)
	at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:227)
	at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:209)
	at org.apache.spark.mllib.api.python.PythonMLLibAPI.trainKMeansModel(PythonMLLibAPI.scala:367)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 83, in _convert_to_vector
    raise TypeError("Cannot convert type %s into Vector" % type(l))
TypeError: Cannot convert type <type 'str'> into Vector

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
	at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more

In [ ]:
# Collect the clustering result
result=data.map(lambda point: clusters.predict(point)).collect()
print(result)

In [ ]:
# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

WSSSE = data.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))

PowerIteration Clustering -----------------------------------------------------

Power Iteration Clustering

columns are : id,inDegree,outDegree,degree


In [46]:
from pyspark.mllib.clustering import PowerIterationClustering, PowerIterationClusteringModel

# Load and parse the data
data = sc.textFile("tmp-txns-no-headers.txt")
similarities = data.map(lambda line: tuple([float(x) for x in line.split(',')]))

In [47]:
type(similarities)


Out[47]:
pyspark.rdd.PipelinedRDD

In [48]:
# Cluster the data into two classes using PowerIterationClustering
model = PowerIterationClustering.train(similarities, k=2, maxIterations=10, initMode='random')

In [49]:
results = []
assignments = model.assignments().collect()
assignments[:3]


Out[49]:
[Assignment(id=384, cluster=0),
 Assignment(id=18624, cluster=0),
 Assignment(id=9200, cluster=0)]

In [50]:
for x in assignments:
    results.append([x.id, x.cluster])

In [51]:
results[:3]


Out[51]:
[[384, 0], [18624, 0], [9200, 0]]

In [52]:
# Save and load model
# model.save(sc, "PICModel")
# sameModel = PowerIterationClusteringModel.load(sc, "PICModel")

In [53]:
results_df = pd.DataFrame(results, index=None, columns = ['id', 'cluster'])

In [54]:
# DOUBLE BRACKETS CREATES A DF
merged = transactions[['isTainted']].merge(results_df, left_index=True, right_on='id')

Model Evaluation on Local Machine

  • Evaluating the performance of a clustering algorithm is not as trivial as counting the number of errors or the precision and recall of a supervised classification algorithm.
  • In particular any evaluation metric should not take the absolute values of the cluster labels into account, but rather, if this clustering defines separations of the data similar to some ground truth set of classes
  • OR satisfying some assumption such that (members belonging to the same class) are more similar that (members of different classes) according to some similarity metric.

In [5]:
import pickle
with open("cluster-results.pkl", 'rb') as picklefile: 
    results = pickle.load(picklefile)

In [14]:
results.head(3)


Out[14]:
isTainted id cluster
10085 0 0 1
11200 0 1 0
10387 0 2 0

In [11]:
def convert_to_true(value):
    if value == 5:
        return 1
    else:
        return 0

results['isTainted'] = results['isTainted'].apply(convert_to_true)

In [15]:
results[results['isTainted'] == 1].head(3)


Out[15]:
isTainted id cluster
10079 1 5608 1
12253 1 5609 1
10190 1 5610 1

In [19]:
from sklearn.metrics import classification_report, homogeneity_score, completeness_score

In [18]:
y_true = results['isTainted']
y_pred = results['cluster']
target_names = ['TaintedWallet', 'Wallet']
print(classification_report(y_true, y_pred, target_names=target_names))


               precision    recall  f1-score   support

TaintedWallet       1.00      1.00      1.00     22109
       Wallet       0.99      1.00      1.00       237

  avg / total       1.00      1.00      1.00     22346

Conditional entropy analyses on clusters


In [20]:
# homogeneity: each cluster contains only members of a single class.
homogeneity_score(y_true, y_pred)


Out[20]:
0.99119856200298073

In [23]:
# completeness: all members of a given class are assigned to the same cluster.
completeness_score(y_true, y_pred)


Out[23]:
0.98440495116631499
  • supervised learning techniques are rigid and don't take into account changing patterns.
  • Using clustering techniques in an unsupervised manner allows for adaptive fraud detection even if fraudsters change the behaviors that led to classification in the first place.

MultinomialNB Classification - tip from guo

  • possibly to determine feature importances?

In [60]:
# http://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html#pyspark.mllib.classification.NaiveBayesModel
from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import SparseVector

In [66]:
data = [LabeledPoint(0.0, [0.0, 0.0]),
        LabeledPoint(0.0, [0.0, 1.0]),
        LabeledPoint(1.0, [1.0, 0.0])]

model = NaiveBayes.train(sc.parallelize(data))
model.predict(array([0.0, 1.0]))
model.predict(array([1.0, 0.0]))

model.predict(sc.parallelize([[1.0, 0.0]])).collect()


Out[66]:
[1.0]

In [67]:
sparse_data = [LabeledPoint(0.0, SparseVector(2, {1: 0.0})),
               LabeledPoint(0.0, SparseVector(2, {1: 1.0})),
               LabeledPoint(1.0, SparseVector(2, {0: 1.0}))]

model = NaiveBayes.train(sc.parallelize(sparse_data))
model.predict(SparseVector(2, {1: 1.0}))


Out[67]:
0.0

In [68]:
model.predict(SparseVector(2, {0: 1.0}))


Out[68]:
1.0

In [ ]:


In [ ]:


In [ ]:

DOESNT WORK: Exporting the model


In [69]:
import os, tempfile
path = tempfile.mkdtemp()

model.save(sc, path)
sameModel = NaiveBayesModel.load(sc, path)
sameModel.predict(SparseVector(2, {0: 1.0})) == model.predict(SparseVector(2, {0: 1.0}))


---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-69-b7858ab4a343> in <module>()
      2 path = tempfile.mkdtemp()
      3 model.save(sc, path)
----> 4 sameModel = NaiveBayesModel.load(sc, path)
      5 sameModel.predict(SparseVector(2, {0: 1.0})) == model.predict(SparseVector(2, {0: 1.0}))
      6 # True

/home/ec2-user/spark/python/pyspark/mllib/classification.pyc in load(cls, sc, path)
    638         """
    639         java_model = sc._jvm.org.apache.spark.mllib.classification.NaiveBayesModel.load(
--> 640             sc._jsc.sc(), path)
    641         # Can not unpickle array.array from Pyrolite in Python3 with "bytes"
    642         py_labels = _java2py(sc, java_model.labels(), "latin1")

/home/ec2-user/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/home/ec2-user/spark/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/home/ec2-user/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.mllib.classification.NaiveBayesModel.load.
: java.lang.UnsupportedOperationException: empty collection
	at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1370)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.first(RDD.scala:1367)
	at org.apache.spark.mllib.util.Loader$.loadMetadata(modelSaveLoad.scala:129)
	at org.apache.spark.mllib.classification.NaiveBayesModel$.load(NaiveBayes.scala:271)
	at org.apache.spark.mllib.classification.NaiveBayesModel.load(NaiveBayes.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)

In [ ]:
from shutil import rmtree
try:
    rmtree(path)
    except OSError:
        pass

In [ ]:
from graphframes.examples import Graphs
g = Graphs(sqlContext).friends()  # Get example graph

# Search for pairs of vertices with edges in both directions between them.
motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
motifs.show()

# More complex queries can be expressed by applying filters.
motifs.filter("b.age > 30").show()