The next machine learning method I'd like to introduce is about clustering, K-means. It is an unsupervised learning method where we would like to group the observations into K groups (or subsets). We call it "unsupervised" since we don't have associated response measurements together with the observations to help check and evaluate the model we built (of course we can use other measures to evaluate the clustering models).
K-means may be the simplest approach for clustering while it’s also an elegant and efficient method. To produce the clusters, K-means method only requires the number of clusters K as its input.
The idea of K-means clustering is that a good clustering is with the smallest within-cluster variation (a measurement of how different the observations within a cluster are from each other) in a possible range. To achieve this purpose, K-means algorithm is designed in a "greedy" algorithm fashion
K-means Algorithm
1. For each observation, assign a random number which is generated from 1 to *K* to it.
2. For each of the *K* clusters, compute the cluster center. The *k*th cluster’s center is the vector of the means of the vectors of all the observations belonging to the kth cluster.
3. Re-assign each observation to the cluster whose cluster center is closest to this observation.
4. Check if the new assignments are the same as the last iteration. If not, go to step 2; if yes, END.
An example of iteration with K-means algorithm is presented below
Now it's time to implement K-means with PySpark. I generate a dateset myself, it contains 30 observations, and I purposedly "made" them group 3 sets.
troubleshooting:
download for py27:
conda install numpy
conda install pandas
install numpy on the nodes:
In [1]:
from __future__ import print_function
# from pyspark import SparkContext, SparkConf
# from pyspark.mllib.clustering import KMeans, KMeansModel
# # http://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html#pyspark.mllib.evaluation.RankingMetrics
# from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics, RankingMetrics
import numpy as np
from numpy import array
import pandas as pd
from random import randrange
from math import sqrt
In [2]:
!ls -l
In [3]:
sc
Out[3]:
In [4]:
conf = SparkConf()
In [5]:
# sc = SparkContext("local", "Simple App")
In [6]:
# sc.stop(sc)
# sc.getOrCreate("local", "Simple App")
from local machine:
# install dependencies on the cluster ==================================
flintrock run-command test-cluster 'sudo yum install -y gcc'
# have faith...flintrock takes awhile (3-5 min)
flintrock run-command test-cluster 'pip install --user numpy'
note to self: learn how to run bash scripts
In [1]:
transactions = pd.read_csv('transactions.csv')
transactions['Date'] = pd.to_datetime(transactions['Date'],unit='ms') #coerce date format
transactions[:3]
In [8]:
print('transactions columns: ', list(transactions.columns))
We will use the pandas package for loading the dataset as a dataframe.
OFFICIAL_SYMBOL_A
header may appear multiple times and thus duplicates need to be removed for unique identifiers
In [9]:
#establish sql context
from pyspark.sql import SQLContext
# Instantiate SQL_SparkContext object
SQL_CONTEXT = SQLContext(sc)
In [10]:
nodes = pd.read_csv('transactions.csv',
usecols=['Source', 'isTainted'],
low_memory=True,
iterator=True,
chunksize=1000)
In [11]:
# Concatenate chunks into list & convert to DataFrame
nodes = pd.DataFrame(pd.concat(list(nodes), ignore_index=True))
In [12]:
# Create a Vertex DataFrame with unique ID column "id"
nodes.columns = ['id', 'isTainted']
print('nodes columns: ', list(nodes.columns))
In [13]:
NODES = SQL_CONTEXT.createDataFrame(nodes)
NODES.take(3)
Out[13]:
In [14]:
# Parallelize -----------------------------------------------------
# VERTICES = sc.parallelize(nodes)
In [15]:
# data = array([observation_group_1, observation_group_2, observation_group_3]).reshape(n_in_each_group*3, 5)
# data
In [16]:
# data = sc.parallelize(data)
In [17]:
# data.getNumPartitions()
In [18]:
# # Generate the observations -----------------------------------------------------
# n_in_each_group = 10 # how many observations in each group
# n_of_feature = 5 # how many features we have for each observation
# observation_group_1=[]
# for i in range(n_in_each_group*n_of_feature):
# observation_group_1.append(randrange(5, 8))
# observation_group_2=[]
# for i in range(n_in_each_group*n_of_feature):
# observation_group_2.append(randrange(55, 58))
# observation_group_3=[]
# for i in range(n_in_each_group*n_of_feature):
# observation_group_3.append(randrange(105, 108))
In [19]:
# del GENES_DF_CLEAN, GENES_DF, GENES
We repeat the same process as before with the difference that now we will be importing all three columns. If I am to be strict, I should say that this step is not really necessary if one opts for loading everything in one go the first time, then SELECTing different column-dataframes thus efficiently differentiating between vertices and edges. Still, I am repeating the code here for presentation convenience. Also, even though it is not really necessary in our case, you should make sure that once you are done with the loading process you get rid of anything that should not be kept in the memory any more, e.g. run:
spark
del GENES_DF_CLEAN, GENES_DF, GENES
Side note: The del statement does not directly reclaim memory. It only, removes a reference, which decrements the reference count on the value and if that count is zero, the memory can be reclaimed. In general, CPython will reclaim the memory immediately, there's no need to wait for the garbage collector to run.
Here is the code for loading the edges:
In [20]:
edges = pd.read_csv('transactions.csv',
usecols=['Amount $', 'Date', 'Destination', 'Source', 'Transaction ID', 'isTainted'],
low_memory=True,
iterator=True,
chunksize=1000)
In [21]:
# Concatenate chunks into list & convert to DataFrame
edges = pd.DataFrame(pd.concat(list(edges), ignore_index=True))
In [22]:
cols = ['Source', 'Destination', 'isTainted', 'Amount $', 'Date', 'Transaction ID']
In [23]:
edges = edges[cols]
print('edges columns: ', list(edges.columns))
In [24]:
# Create an Edge DataFrame with "src" and "dst" columns
edges.columns = ["src", "dst", "relationship", 'Amount $', 'Date', 'TxID']
print('edges columns: ', list(edges.columns))
In [25]:
EDGES = SQL_CONTEXT.createDataFrame(edges)
EDGES.take(3)
Out[25]:
In [26]:
from graphframes import *
In [27]:
type(NODES), type(EDGES)
Out[27]:
In [28]:
# Next we finally create the graph:
g = GraphFrame(NODES, EDGES)
In [29]:
# Query: Count the number of "isTainted" connections in the graph.
print(g.vertices.count())
print(g.edges.count())
print(g.degrees.count())
print(g.vertices.filter("isTainted = 5").count())
In [30]:
# Query: Get in-degree of each vertex.
print("Vertex in-Degree -----------------------------------------------------------------------")
df = g.inDegrees.sort('inDegree', ascending=False).toPandas()
transactions = transactions.merge(df,
left_on='Source',
right_on='id',)
transactions.head()
Out[30]:
In [31]:
print("Vertex out-Degree ----------------------------------------------------------------------")
df = g.outDegrees.sort('outDegree', ascending=False).toPandas()
transactions = transactions.merge(df,
left_on='Source',
right_on='id')
transactions.head()
Out[31]:
In [32]:
print("Vertex degree --------------------------------------------------------------------------")
df = g.degrees.sort('degree', ascending=False).toPandas()
transactions = transactions.merge(df,
left_on='Source',
right_on='id')
transactions.head()
Out[32]:
In [33]:
transactions = transactions.drop(['id_x', 'id_y', 'id'], axis = 1)
In [ ]:
# hits no space left on device
print("Triangle Count -------------------------------------------------------------------------")
RESULTS = g.triangleCount()
df = RESULTS.select("id", "count").toPandas()
transactions = transactions.merge(df,
left_on='Source',
right_on='id')
transactions.head()
In [ ]:
print("Label Propagation ----------------------------------------------------------------------")
# Convergence is not guaranteed
df = g.labelPropagation(maxIter=10).toPandas()
# transactions = transactions.merge(df,
# left_on='Source',
# right_on='id')
df.head()
In [23]:
# # Run PageRank algorithm the other one, and show results.
# results = g.pageRank(resetProbability=0.01, maxIter=20)
# results.vertices.select("id", "pagerank").show()
In [ ]:
# Run PageRank algorithm (takes awhile), and show results.
print("PageRank -------------------------------------------------------------------------------")
df = g.pageRank(resetProbability=0.15, tol=0.01)\
.vertices.sort('pagerank', ascending=False).toPandas()
# transactions = transactions.merge(df,
# left_on='Source',
# right_on='id')
df.head()
In [ ]:
print("Find Shortest Paths w.r.t. Tainted Wallets ---------------------------------------------------")
SHORTEST_PATH = g.shortestPaths(landmarks=["5"])
df = SHORTEST_PATH.select("id", "distances").toPandas()
# transactions = transactions.merge(df,
# left_on='Source',
# right_on='id')
df.head()
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [35]:
print('transactions columns: ', list(transactions.columns))
In [36]:
cols = ['inDegree', 'outDegree', 'degree']
tmp_transactions = transactions[cols]
In [37]:
def string_to_int(value):
try:
return int(value)
except ValueError:
return None
In [38]:
for column in tmp_transactions.columns:
tmp_transactions[column] = tmp_transactions[column].apply(string_to_int)
In [39]:
tmp_transactions.info()
In [40]:
data = sc.parallelize(tmp_transactions)
In [41]:
data.getNumPartitions()
Out[41]:
In [44]:
# for MultinomialNB classification
tmp_transactions.to_csv('tmp-txns-no-headers.txt', header=False, index = True)
In [45]:
!ls
In [45]:
# Build the K-Means model
# the initializationMode can also be "k-means||" or set by users.
clusters = KMeans.train(data, 2, maxIterations=3, initializationMode="random")
In [ ]:
# Collect the clustering result
result=data.map(lambda point: clusters.predict(point)).collect()
print(result)
In [ ]:
# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
center = clusters.centers[clusters.predict(point)]
return sqrt(sum([x**2 for x in (point - center)]))
WSSSE = data.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))
columns are : id,inDegree,outDegree,degree
In [46]:
from pyspark.mllib.clustering import PowerIterationClustering, PowerIterationClusteringModel
# Load and parse the data
data = sc.textFile("tmp-txns-no-headers.txt")
similarities = data.map(lambda line: tuple([float(x) for x in line.split(',')]))
In [47]:
type(similarities)
Out[47]:
In [48]:
# Cluster the data into two classes using PowerIterationClustering
model = PowerIterationClustering.train(similarities, k=2, maxIterations=10, initMode='random')
In [49]:
results = []
assignments = model.assignments().collect()
assignments[:3]
Out[49]:
In [50]:
for x in assignments:
results.append([x.id, x.cluster])
In [51]:
results[:3]
Out[51]:
In [52]:
# Save and load model
# model.save(sc, "PICModel")
# sameModel = PowerIterationClusteringModel.load(sc, "PICModel")
In [53]:
results_df = pd.DataFrame(results, index=None, columns = ['id', 'cluster'])
In [54]:
# DOUBLE BRACKETS CREATES A DF
merged = transactions[['isTainted']].merge(results_df, left_index=True, right_on='id')
In [5]:
import pickle
with open("cluster-results.pkl", 'rb') as picklefile:
results = pickle.load(picklefile)
In [14]:
results.head(3)
Out[14]:
In [11]:
def convert_to_true(value):
if value == 5:
return 1
else:
return 0
results['isTainted'] = results['isTainted'].apply(convert_to_true)
In [15]:
results[results['isTainted'] == 1].head(3)
Out[15]:
In [19]:
from sklearn.metrics import classification_report, homogeneity_score, completeness_score
In [18]:
y_true = results['isTainted']
y_pred = results['cluster']
target_names = ['TaintedWallet', 'Wallet']
print(classification_report(y_true, y_pred, target_names=target_names))
Conditional entropy analyses on clusters
In [20]:
# homogeneity: each cluster contains only members of a single class.
homogeneity_score(y_true, y_pred)
Out[20]:
In [23]:
# completeness: all members of a given class are assigned to the same cluster.
completeness_score(y_true, y_pred)
Out[23]:
In [60]:
# http://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html#pyspark.mllib.classification.NaiveBayesModel
from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import SparseVector
In [66]:
data = [LabeledPoint(0.0, [0.0, 0.0]),
LabeledPoint(0.0, [0.0, 1.0]),
LabeledPoint(1.0, [1.0, 0.0])]
model = NaiveBayes.train(sc.parallelize(data))
model.predict(array([0.0, 1.0]))
model.predict(array([1.0, 0.0]))
model.predict(sc.parallelize([[1.0, 0.0]])).collect()
Out[66]:
In [67]:
sparse_data = [LabeledPoint(0.0, SparseVector(2, {1: 0.0})),
LabeledPoint(0.0, SparseVector(2, {1: 1.0})),
LabeledPoint(1.0, SparseVector(2, {0: 1.0}))]
model = NaiveBayes.train(sc.parallelize(sparse_data))
model.predict(SparseVector(2, {1: 1.0}))
Out[67]:
In [68]:
model.predict(SparseVector(2, {0: 1.0}))
Out[68]:
In [ ]:
In [ ]:
In [ ]:
In [69]:
import os, tempfile
path = tempfile.mkdtemp()
model.save(sc, path)
sameModel = NaiveBayesModel.load(sc, path)
sameModel.predict(SparseVector(2, {0: 1.0})) == model.predict(SparseVector(2, {0: 1.0}))
In [ ]:
from shutil import rmtree
try:
rmtree(path)
except OSError:
pass
In [ ]:
from graphframes.examples import Graphs
g = Graphs(sqlContext).friends() # Get example graph
# Search for pairs of vertices with edges in both directions between them.
motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
motifs.show()
# More complex queries can be expressed by applying filters.
motifs.filter("b.age > 30").show()