In [ ]:
import pyspark
from pyspark.sql import SparkSession
sc = pyspark.SparkContext(appName="sparkSQL")
ss = SparkSession(sc)

In [ ]:
data = "file:////path/to/recitation4/problems/kddcup.data_10_percent"
raw = sc.textFile(data).cache()

We will create a local dense vector for our KDD dataset.


In [ ]:
raw.take(1)

In [ ]:
import numpy as np

def parse_kdd(line):
    split = line.split(",")
    # we will keep just numeric and logical values
    # discard any string values
    symbolic_indexes = [1,2,3,41]
    clean_split = [item for i,item in enumerate(split) if i not in symbolic_indexes]
    return np.array([float(x) for x in clean_split])

vector_data = raw.map(parse_kdd)

In [ ]:
from pyspark.mllib.stat import Statistics 
from math import sqrt 

# Compute column summary statistics.
summary = Statistics.colStats(vector_data)

print "Duration Statistics:"
print " Mean: {}".format(round(summary.mean()[0],3))
print " St. deviation: {}".format(round(sqrt(summary.variance()[0]),3))
print " Max value: {}".format(round(summary.max()[0],3))
print " Min value: {}".format(round(summary.min()[0],3))
print " Total value count: {}".format(summary.count())
print " Number of non-zero values: {}".format(summary.numNonzeros()[0])

We are interested in preparing a classification system for attack/no attack or different attack types. This requires us to use label along with summary statistics and analyse data properly.


In [ ]:
# Create a function to return a tuple with label as its zeroth index 
# and corresponding summary statistic as its first index. 
def parse_kdd_label(line):
    split = line.split(",")
    # we will keep just numeric and logical values
    # discard any string values

In [ ]:
def summary_by_label(raw_data, label):
    label_vector_data = raw_data.map(parse_kdd_label).filter(lambda x: x[0]==label)
    return Statistics.colStats(label_vector_data.values())

In [ ]:
label_list = ["back.","buffer_overflow.","ftp_write.","guess_passwd.",
              "imap.","ipsweep.","land.","loadmodule.","multihop.",
              "neptune.","nmap.","normal.","perl.","phf.","pod.","portsweep.",
              "rootkit.","satan.","smurf.","spy.","teardrop.","warezclient.",
              "warezmaster."]

In [ ]:
label_summary_dict = {}
# Create a dictionary of key = label_list elements, value = corresponding summary statistics

In [ ]:
print label_summary_dict['smurf.']

In [ ]: