In [1]:
from pyspark import SparkConf
from pyspark import SparkContext

import pandas as pd

In [2]:
conf = SparkConf()
conf.setMaster('spark://ip-172-31-9-200:7077')
conf.setAppName('spark_analytics_chpt_5')
conf.set("spark.executor.memory", "10g")
sc = SparkContext(conf=conf)

In [3]:
raw_data = sc.textFile('kddcup.data')

In [4]:
raw_data.take(5)


Out[4]:
[u'0,tcp,http,SF,215,45076,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,0,0,0.00,0.00,0.00,0.00,0.00,0.00,0.00,0.00,normal.',
 u'0,tcp,http,SF,162,4528,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,2,2,0.00,0.00,0.00,0.00,1.00,0.00,0.00,1,1,1.00,0.00,1.00,0.00,0.00,0.00,0.00,0.00,normal.',
 u'0,tcp,http,SF,236,1228,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,2,2,1.00,0.00,0.50,0.00,0.00,0.00,0.00,0.00,normal.',
 u'0,tcp,http,SF,233,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,2,2,0.00,0.00,0.00,0.00,1.00,0.00,0.00,3,3,1.00,0.00,0.33,0.00,0.00,0.00,0.00,0.00,normal.',
 u'0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,3,3,0.00,0.00,0.00,0.00,1.00,0.00,0.00,4,4,1.00,0.00,0.25,0.00,0.00,0.00,0.00,0.00,normal.']

In [5]:
events = raw_data.map(lambda x: x.split(',')[-1]).countByValue()

In [6]:
sorted(events.items(), key=lambda x: x[1], reverse=True)


Out[6]:
[(u'smurf.', 2807886),
 (u'neptune.', 1072017),
 (u'normal.', 972781),
 (u'satan.', 15892),
 (u'ipsweep.', 12481),
 (u'portsweep.', 10413),
 (u'nmap.', 2316),
 (u'back.', 2203),
 (u'warezclient.', 1020),
 (u'teardrop.', 979),
 (u'pod.', 264),
 (u'guess_passwd.', 53),
 (u'buffer_overflow.', 30),
 (u'land.', 21),
 (u'warezmaster.', 20),
 (u'imap.', 12),
 (u'rootkit.', 10),
 (u'loadmodule.', 9),
 (u'ftp_write.', 8),
 (u'multihop.', 7),
 (u'phf.', 4),
 (u'perl.', 3),
 (u'spy.', 2)]

In [7]:
from pyspark.mllib.linalg import Vectors

def to_float(s):
    try:
        return float(s)
    except:
        return float('nan')

def clean(line):
    values = [line.split(',')[0]] + line.split(',')[4:-1]
    label = line.split(',')[-1]
    vector = Vectors.dense([to_float(x) for x in values])
    return (label, vector)

In [8]:
labels_and_data = raw_data.map(clean)

In [9]:
labels_and_data.take(5)


Out[9]:
[(u'normal.',
  DenseVector([0.0, 215.0, 45076.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])),
 (u'normal.',
  DenseVector([0.0, 162.0, 4528.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.0, 2.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0])),
 (u'normal.',
  DenseVector([0.0, 236.0, 1228.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 2.0, 1.0, 0.0, 0.5, 0.0, 0.0, 0.0, 0.0, 0.0])),
 (u'normal.',
  DenseVector([0.0, 233.0, 2032.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.0, 2.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 3.0, 3.0, 1.0, 0.0, 0.33, 0.0, 0.0, 0.0, 0.0, 0.0])),
 (u'normal.',
  DenseVector([0.0, 239.0, 486.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3.0, 3.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 4.0, 4.0, 1.0, 0.0, 0.25, 0.0, 0.0, 0.0, 0.0, 0.0]))]

In [10]:
data = labels_and_data.map(lambda x: x[1])

In [11]:
data.take(5)


Out[11]:
[DenseVector([0.0, 215.0, 45076.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
 DenseVector([0.0, 162.0, 4528.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.0, 2.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
 DenseVector([0.0, 236.0, 1228.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 2.0, 1.0, 0.0, 0.5, 0.0, 0.0, 0.0, 0.0, 0.0]),
 DenseVector([0.0, 233.0, 2032.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.0, 2.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 3.0, 3.0, 1.0, 0.0, 0.33, 0.0, 0.0, 0.0, 0.0, 0.0]),
 DenseVector([0.0, 239.0, 486.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 3.0, 3.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 4.0, 4.0, 1.0, 0.0, 0.25, 0.0, 0.0, 0.0, 0.0, 0.0])]

In [12]:
from pyspark.mllib.clustering import KMeans

Run with k-means clustering


In [13]:
kmeans = KMeans()
model = kmeans.train(data, 2)

In [14]:
model.clusterCenters


Out[14]:
[array([  4.83401949e+01,   1.83462155e+03,   8.26203190e+02,
          5.71611720e-06,   6.48779303e-04,   7.96173468e-06,
          1.24376586e-02,   3.20510858e-05,   1.43529049e-01,
          8.08830584e-03,   6.81851124e-05,   3.67464677e-05,
          1.29349608e-02,   1.18874823e-03,   7.43095237e-05,
          1.02114351e-03,   0.00000000e+00,   4.08294086e-07,
          8.35165553e-04,   3.34973508e+02,   2.95267146e+02,
          1.77970317e-01,   1.78036989e-01,   5.76648988e-02,
          5.77299094e-02,   7.89884132e-01,   2.11796106e-02,
          2.82608101e-02,   2.32981078e+02,   1.89214283e+02,
          7.53713390e-01,   3.07109788e-02,   6.05051931e-01,
          6.46410789e-03,   1.78091184e-01,   1.77885898e-01,
          5.79276115e-02,   5.76592214e-02]),
 array([  1.09990000e+04,   0.00000000e+00,   1.30993740e+09,
          0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
          0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
          0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
          0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
          0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
          0.00000000e+00,   1.00000000e+00,   1.00000000e+00,
          0.00000000e+00,   0.00000000e+00,   1.00000000e+00,
          1.00000000e+00,   1.00000000e+00,   0.00000000e+00,
          0.00000000e+00,   2.55000000e+02,   1.00000000e+00,
          0.00000000e+00,   6.50000000e-01,   1.00000000e+00,
          0.00000000e+00,   0.00000000e+00,   0.00000000e+00,
          1.00000000e+00,   1.00000000e+00])]

In [15]:
def clusterLabelCount(label, data):
    cluster = model.predict(data)
    return (cluster, label)

In [16]:
cluster_label_count = labels_and_data.map(lambda x: clusterLabelCount(x[0], x[1])).countByValue()

In [17]:
cluster_label_count


Out[17]:
defaultdict(int,
            {(0, u'back.'): 2203,
             (0, u'buffer_overflow.'): 30,
             (0, u'ftp_write.'): 8,
             (0, u'guess_passwd.'): 53,
             (0, u'imap.'): 12,
             (0, u'ipsweep.'): 12481,
             (0, u'land.'): 21,
             (0, u'loadmodule.'): 9,
             (0, u'multihop.'): 7,
             (0, u'neptune.'): 1072017,
             (0, u'nmap.'): 2316,
             (0, u'normal.'): 972781,
             (0, u'perl.'): 3,
             (0, u'phf.'): 4,
             (0, u'pod.'): 264,
             (0, u'portsweep.'): 10412,
             (0, u'rootkit.'): 10,
             (0, u'satan.'): 15892,
             (0, u'smurf.'): 2807886,
             (0, u'spy.'): 2,
             (0, u'teardrop.'): 979,
             (0, u'warezclient.'): 1020,
             (0, u'warezmaster.'): 20,
             (1, u'portsweep.'): 1})

In [18]:
cluster = model.predict(data)

In [19]:
import numpy as np

def distance(a, b):
    return np.linalg.norm(a - b)

def dist_to_centroid(data, model):
    cluster = model.predict(data)
    centroid = model.clusterCenters[cluster]
    return distance(centroid, data)

def clustering_score(data, k):
    kmeans = KMeans()
    model = kmeans.train(data, k)
    result = data.map(lambda data: dist_to_centroid(data, model))
    return result.mean()

In [20]:
all_distances = [(k, clustering_score(data, k)) for k in range(1, 4)]

In [21]:
all_distances


Out[21]:
[(1, 3826.5608567244853), (2, 3423.2242956547939), (3, 2440.7204258974753)]

Run with different hyperparameters, k=100


In [22]:
# kmeans.train(data, 100, runs=10, epsilon=1e-6)

# result = data.map(lambda data: dist_to_centroid(data, model))

# distance = result.mean()