Import the necessary libraries and connect to the server


In [ ]:
import os
import csv
import json
import numpy as np
import socket,struct
import matplotlib.pyplot as plt
%matplotlib inline
import time as time
import itertools
import pandas as pd

#Unique prefix to make sure my names don't conflict with yours
MY_SUFFIX = "_" + os.getcwd().split('/')[-1] 

print "MY_SUFFIX =",MY_SUFFIX

Import the sparktk and catalog libraries


In [ ]:
import sparktk
import tap_catalog
from sparktk import TkContext
from tap_catalog import DataCatalog

print "SparkTK installation path = %s" % (sparktk.__path__)

tc = TkContext(master='yarn-client', extra_conf={"spark.yarn.am.memory":"3712m", 
                                                   "spark.executor.memory":"3712m", 
                                                   "spark.yarn.driver.memoryOverhead": "384",
                                                   "spark.yarn.executor.memoryOverhead": "384",
                                                   "spark.driver.cores": "1",
                                                   "spark.executor.cores":"1",
                                                   "spark.shuffle.io.preferDirectBufs":"false",
                                                   "spark.dynamicAllocation.enabled": "true",
                                                   "spark.shuffle.service.enabled": "true",
                                                   "spark.sql.shuffle.partitions": "10"})

Cleanup objects prior to next run


In [ ]:
reference = MY_SUFFIX 
score = reference

Now, read in the data


In [ ]:
dataset = "hdfs://nameservice1/org/29ace093-e11f-4f0b-b254-3f8e973476e5/brokers/userspace/694b3da9-c21a-4063-bf16-e072ac47f881/bae3a89e-eaad-4a8b-b1b9-f42e1d1426cc/000000_1"
week2_nf_schema=[("TimeSeconds", float),
                 ("tstart", str),
                 ("dateTimeStr", float),
                 ("protocol", str),
                 ("proto", str),
                 ("src", str),
                 ("dst", str),
                 ("sport", int),
                 ("dport", int),
                 ("flag", int),
                 ("fwd", int),
                 ("tdur", int),
                 ("firstSeenSrcPayloadBytes", int),
                 ("firstSeenDestPayloadBytes", int),
                 ("ibyt", int),
                 ("obyt", int),
                 ("ipkt", int),
                 ("opkt", int),
                 ("recordForceOut", int)]

real_netflow = tc.frame.import_csv(path=dataset, schema=week2_nf_schema, header=True)

real_netflow.inspect(wrap=10, round=4, width=100)

Add Columns for date/time


In [ ]:
def get_time(row):
    x = row.tstart.split(" ")
    date = x[0]
    time = x[1]
    y = time.split(":")
    hour = float(y[0])
    minute = float(y[1])
    numeric_time = hour + minute/60.0
    return [date, time, hour, minute, numeric_time]

In [ ]:
real_netflow.add_columns(get_time, [('date',str), 
                                    ('time',str), 
                                    ('hour', float), 
                                    ('minute', float), 
                                    ('numeric_time', float)])

real_netflow.inspect(wrap=10, round=4, width=100,
                     columns=['date', 'time', 'hour', 'minute', 'numeric_time'])

Build a Graph for computing graph statistics


In [ ]:
# Create edge dataframe
network_edges = real_netflow.group_by(['src', 'dst'], tc.agg.count)
# Create vertex dataframe
src_frame = network_edges.copy()
src_frame.drop_columns('dst')
src_frame.rename_columns({"src":"id"})
dst_frame = network_edges.copy()
dst_frame.drop_columns('src')
dst_frame.rename_columns({"dst":"id"})
src_frame.append(dst_frame)
network_vertices = src_frame.group_by(['id'])

# Create a graph
network_graph = tc.graph.create(network_vertices,network_edges)

Compute bytes in and bytes out


In [ ]:
bytes_out = real_netflow.group_by(['src'], tc.agg.count, {'ibyt': tc.agg.sum})
bytes_out.rename_columns({'ibyt_SUM': 'obyt_SUM', 'count': 'outgoing_connections'})
print bytes_out.inspect()

In [ ]:
bytes_in = real_netflow.group_by(['dst'], {'ibyt': tc.agg.sum})
bytes_in.rename_columns({'dst': 'src'})
print bytes_in.inspect()

In [ ]:
bytes_in_out = bytes_in.join_inner(bytes_out, left_on='src')

In [ ]:
bytes_in_out.add_columns(lambda row: [float(np.log(row.ibyt_SUM)), float(np.log(row.obyt_SUM))], 
                         [('ln_ibyt_SUM', float), ('ln_obyt_SUM', float)])

print bytes_in_out.inspect(wrap=10, round=4, width=100)

Compute Weighted/Unweighted Degree Counts


In [ ]:
print 'Compute unweighted degree count:'
unweighted_degree_frame = network_graph.degrees('undirected')
print unweighted_degree_frame.inspect()

In [ ]:
print 'Compute weighted degree count:'
weighted_degree_frame = network_graph.weighted_degrees(edge_weight = 'count')

print weighted_degree_frame.inspect()

Compute Summary frame and Download it to Pandas


In [ ]:
weighted_degree_frame.rename_columns({'id': 'src', 'degree' : 'weighted_degree'})
ip_summary_frame_intermediate = bytes_in_out.join_inner(weighted_degree_frame, left_on='src')

print ip_summary_frame_intermediate.schema

In [ ]:
unweighted_degree_frame.rename_columns({'id':'src', 'degree' : 'unweighted_degree'})
ip_summary_frame = ip_summary_frame_intermediate.join_inner(unweighted_degree_frame, left_on='src')

print ip_summary_frame.schema

In [ ]:
ip_summary_frame.add_columns(lambda row: [row.ibyt_SUM + row.obyt_SUM, 
                                          float(np.log(row.ibyt_SUM + row.obyt_SUM)),
                                          float(np.log(row.unweighted_degree)),
                                          float(np.log(row.weighted_degree))], 
                             [('total_traffic', float),
                             ('ln_total_traffic', float),
                             ('ln_degree', float),
                             ('ln_weighted_degree', float)])

In [ ]:
ip_summary_frame.rename_columns({'src': 'ip'})

print ip_summary_frame.inspect()

In [ ]:
ip_summary_frame_pd = ip_summary_frame.to_pandas(ip_summary_frame.count())
ip_summary_frame_pd.head()

Compute histogram bins


In [ ]:
outgoing_connections_bins = 100
ln_ibyt_SUM_bins = 100
ln_obyt_SUM_bins = 100
ln_total_traffic_bins = 100
weighted_degree_bins = 100
degree_bins = 100

def plot_hist(histogram):
    plt.bar(histogram.cutoffs[:-1], histogram.hist, width = histogram.cutoffs[1]-histogram.cutoffs[0])
    plt.xlim(min(histogram.cutoffs), max(histogram.cutoffs))

histograms = {}
histograms['outgoing_connections'] = ip_summary_frame.histogram(
    column_name = "outgoing_connections", num_bins=outgoing_connections_bins)
histograms['ln_ibyt_SUM'] = ip_summary_frame.histogram(column_name = "ln_ibyt_SUM", num_bins=ln_ibyt_SUM_bins)
histograms['ln_obyt_SUM'] = ip_summary_frame.histogram(column_name = "ln_obyt_SUM", num_bins=ln_obyt_SUM_bins)
histograms['ln_total_traffic'] = ip_summary_frame.histogram(column_name = "ln_total_traffic", num_bins=ln_total_traffic_bins)
histograms['ln_weighted_degree'] = ip_summary_frame.histogram(column_name = "ln_weighted_degree", num_bins=weighted_degree_bins)
histograms['ln_degree'] = ip_summary_frame.histogram(column_name = "ln_degree", num_bins=degree_bins)

Plot histograms


In [ ]:
plot_hist(histograms['outgoing_connections'])

In [ ]:
plot_hist(histograms['ln_ibyt_SUM'])

In [ ]:
plot_hist(histograms['ln_obyt_SUM'])

In [ ]:
plot_hist(histograms['ln_total_traffic'])

In [ ]:
plot_hist(histograms['ln_weighted_degree'])

In [ ]:
plot_hist(histograms['ln_degree'])

Make a scatter plot


In [ ]:
# import the scatter_matrix functionality
from pandas.tools.plotting import scatter_matrix

# define colors list, to be used to plot survived either red (=0) or green (=1)
colors=['yellow','blue']

# make a scatter plot
df = pd.DataFrame(ip_summary_frame.take(ip_summary_frame.count(), 
                                        columns=["ln_ibyt_SUM", "ln_obyt_SUM", "ln_degree", "ln_weighted_degree"]), 
                  columns=["ln_ibyt_SUM", "ln_obyt_SUM", "ln_degree", "ln_weighted_degree"])

scatter_matrix(df, figsize=[20,20],marker='x',
               c=df.ln_degree.apply(lambda x: colors[1]))

df.info()

Create and train an SVM model


In [ ]:
ip_summary_frame.add_columns(lambda row: '1', ("label", float))
ip_summary_frame.inspect()

In [ ]:
SVM_model = tc.models.classification.svm.train(ip_summary_frame, 'label', 
                ['ln_ibyt_SUM', 'ln_obyt_SUM', 'ln_degree', 'ln_weighted_degree'])

In [ ]:
scored_frame = SVM_model.predict(ip_summary_frame, 
                                 ['ln_ibyt_SUM', 'ln_obyt_SUM', 'ln_degree', 'ln_weighted_degree'])

In [ ]:
scored_frame_group_by = scored_frame.group_by('predicted_label', tc.agg.count)

scored_frame_group_by.inspect(wrap=10, round=4, width=100)

Download scatter frame to Pandas


In [ ]:
df = pd.DataFrame(scored_frame.take(ip_summary_frame.count(), 
                                    columns=["ln_ibyt_SUM", "ln_obyt_SUM", "ln_degree", "ln_weighted_degree", "predicted_label"]), 
                  columns=["ln_ibyt_SUM", "ln_obyt_SUM", "ln_degree", "ln_weighted_degree", "predicted_label"])

#scored_frame_pd = scored_frame.download(scored_frame.row_count)
# import the scatter_matrix functionality
from pandas.tools.plotting import scatter_matrix

# define colors list, to be used to plot survived either red (=0) or green (=1)
colors=['yellow','blue']

# make a scatter plot
scatter_matrix(df, figsize=[20,20],marker='x',
               c=df.predicted_label.apply(lambda x: (colors[1] if x == 1 else colors[0])))

df.info()

Export the Model in MAR format

Export the trained model to MAR format


In [ ]:
SVM_model.export_to_mar("hdfs://nameservice1/user/vcap/netflow_svm_model.mar")

Import Data Catalog client module from tap_catalog


In [ ]:
from tap_catalog import DataCatalog

Create an instance of Data Catalog

data_catalog = DataCatalog('TAP_DOMAIN_URI', 'TAP_USERNAME', 'TAP_PASSWORD') # For Scripting purposes


In [ ]:
data_catalog = DataCatalog()

In [ ]:
data_catalog.add("hdfs://nameservice1/user/vcap/netflow_svm_model.mar")

Only run the below lines to remove file/models from HDFS (Cleanup)

Inspect HDFS directly using hdfsclient


In [ ]:
#import hdfsclient
#from hdfsclient import ls, mkdir, rm, mv

In [ ]:
#ls("/user/vcap/*.*")

Cleanup the file from HDFS

(This does not delete from data catalog. Remember to delete it from the Data Catalog UI)


In [ ]:
#rm("/user/vcap/netflow_svm_model.mar")

In [ ]:


In [ ]: