In [1]:
from pyspark import SparkConf
from pyspark import SparkContext
import pandas as pd
In [2]:
conf = SparkConf()
conf.setMaster('spark://ip-172-31-9-200:7077')
conf.setAppName('spark_analytics_chpt_5')
conf.set("spark.executor.memory", "10g")
sc = SparkContext(conf=conf)
In [3]:
raw_data = sc.textFile('kddcup.data')
In [4]:
raw_data.take(5)
Out[4]:
In [5]:
events = raw_data.map(lambda x: x.split(',')[-1]).countByValue()
In [6]:
sorted(events.items(), key=lambda x: x[1], reverse=True)
Out[6]:
In [7]:
from pyspark.mllib.linalg import Vectors
def to_float(s):
try:
return float(s)
except:
return float('nan')
def clean(line):
values = [line.split(',')[0]] + line.split(',')[4:-1]
label = line.split(',')[-1]
vector = Vectors.dense([to_float(x) for x in values])
return (label, vector)
In [8]:
labels_and_data = raw_data.map(clean)
In [9]:
labels_and_data.take(5)
Out[9]:
In [10]:
data = labels_and_data.map(lambda x: x[1])
In [11]:
data.take(5)
Out[11]:
In [12]:
from pyspark.mllib.clustering import KMeans
Run with k-means clustering
In [13]:
kmeans = KMeans()
model = kmeans.train(data, 2)
In [14]:
model.clusterCenters
Out[14]:
In [15]:
def clusterLabelCount(label, data):
cluster = model.predict(data)
return (cluster, label)
In [16]:
cluster_label_count = labels_and_data.map(lambda x: clusterLabelCount(x[0], x[1])).countByValue()
In [17]:
cluster_label_count
Out[17]:
In [18]:
cluster = model.predict(data)
In [19]:
import numpy as np
def distance(a, b):
return np.linalg.norm(a - b)
def dist_to_centroid(data, model):
cluster = model.predict(data)
centroid = model.clusterCenters[cluster]
return distance(centroid, data)
def clustering_score(data, k):
kmeans = KMeans()
model = kmeans.train(data, k)
result = data.map(lambda data: dist_to_centroid(data, model))
return result.mean()
In [20]:
all_distances = [(k, clustering_score(data, k)) for k in range(1, 4)]
In [21]:
all_distances
Out[21]:
Run with different hyperparameters, k=100
In [22]:
# kmeans.train(data, 100, runs=10, epsilon=1e-6)
# result = data.map(lambda data: dist_to_centroid(data, model))
# distance = result.mean()