In [ ]:
import os
import csv
import json
import numpy as np
import socket,struct
import matplotlib.pyplot as plt
%matplotlib inline
import time as time
import itertools
import pandas as pd
#Unique prefix to make sure my names don't conflict with yours
MY_SUFFIX = "_" + os.getcwd().split('/')[-1]
print "MY_SUFFIX =",MY_SUFFIX
In [ ]:
import sparktk
import tap_catalog
from sparktk import TkContext
from tap_catalog import DataCatalog
print "SparkTK installation path = %s" % (sparktk.__path__)
tc = TkContext(master='yarn-client', extra_conf={"spark.yarn.am.memory":"3712m",
"spark.executor.memory":"3712m",
"spark.yarn.driver.memoryOverhead": "384",
"spark.yarn.executor.memoryOverhead": "384",
"spark.driver.cores": "1",
"spark.executor.cores":"1",
"spark.shuffle.io.preferDirectBufs":"false",
"spark.dynamicAllocation.enabled": "true",
"spark.shuffle.service.enabled": "true",
"spark.sql.shuffle.partitions": "10"})
In [ ]:
reference = MY_SUFFIX
score = reference
In [ ]:
dataset = "hdfs://nameservice1/org/29ace093-e11f-4f0b-b254-3f8e973476e5/brokers/userspace/694b3da9-c21a-4063-bf16-e072ac47f881/bae3a89e-eaad-4a8b-b1b9-f42e1d1426cc/000000_1"
week2_nf_schema=[("TimeSeconds", float),
("tstart", str),
("dateTimeStr", float),
("protocol", str),
("proto", str),
("src", str),
("dst", str),
("sport", int),
("dport", int),
("flag", int),
("fwd", int),
("tdur", int),
("firstSeenSrcPayloadBytes", int),
("firstSeenDestPayloadBytes", int),
("ibyt", int),
("obyt", int),
("ipkt", int),
("opkt", int),
("recordForceOut", int)]
real_netflow = tc.frame.import_csv(path=dataset, schema=week2_nf_schema, header=True)
real_netflow.inspect(wrap=10, round=4, width=100)
In [ ]:
def get_time(row):
x = row.tstart.split(" ")
date = x[0]
time = x[1]
y = time.split(":")
hour = float(y[0])
minute = float(y[1])
numeric_time = hour + minute/60.0
return [date, time, hour, minute, numeric_time]
In [ ]:
real_netflow.add_columns(get_time, [('date',str),
('time',str),
('hour', float),
('minute', float),
('numeric_time', float)])
real_netflow.inspect(wrap=10, round=4, width=100,
columns=['date', 'time', 'hour', 'minute', 'numeric_time'])
In [ ]:
# Create edge dataframe
network_edges = real_netflow.group_by(['src', 'dst'], tc.agg.count)
# Create vertex dataframe
src_frame = network_edges.copy()
src_frame.drop_columns('dst')
src_frame.rename_columns({"src":"id"})
dst_frame = network_edges.copy()
dst_frame.drop_columns('src')
dst_frame.rename_columns({"dst":"id"})
src_frame.append(dst_frame)
network_vertices = src_frame.group_by(['id'])
# Create a graph
network_graph = tc.graph.create(network_vertices,network_edges)
In [ ]:
bytes_out = real_netflow.group_by(['src'], tc.agg.count, {'ibyt': tc.agg.sum})
bytes_out.rename_columns({'ibyt_SUM': 'obyt_SUM', 'count': 'outgoing_connections'})
print bytes_out.inspect()
In [ ]:
bytes_in = real_netflow.group_by(['dst'], {'ibyt': tc.agg.sum})
bytes_in.rename_columns({'dst': 'src'})
print bytes_in.inspect()
In [ ]:
bytes_in_out = bytes_in.join_inner(bytes_out, left_on='src')
In [ ]:
bytes_in_out.add_columns(lambda row: [float(np.log(row.ibyt_SUM)), float(np.log(row.obyt_SUM))],
[('ln_ibyt_SUM', float), ('ln_obyt_SUM', float)])
print bytes_in_out.inspect(wrap=10, round=4, width=100)
In [ ]:
print 'Compute unweighted degree count:'
unweighted_degree_frame = network_graph.degrees('undirected')
print unweighted_degree_frame.inspect()
In [ ]:
print 'Compute weighted degree count:'
weighted_degree_frame = network_graph.weighted_degrees(edge_weight = 'count')
print weighted_degree_frame.inspect()
In [ ]:
weighted_degree_frame.rename_columns({'id': 'src', 'degree' : 'weighted_degree'})
ip_summary_frame_intermediate = bytes_in_out.join_inner(weighted_degree_frame, left_on='src')
print ip_summary_frame_intermediate.schema
In [ ]:
unweighted_degree_frame.rename_columns({'id':'src', 'degree' : 'unweighted_degree'})
ip_summary_frame = ip_summary_frame_intermediate.join_inner(unweighted_degree_frame, left_on='src')
print ip_summary_frame.schema
In [ ]:
ip_summary_frame.add_columns(lambda row: [row.ibyt_SUM + row.obyt_SUM,
float(np.log(row.ibyt_SUM + row.obyt_SUM)),
float(np.log(row.unweighted_degree)),
float(np.log(row.weighted_degree))],
[('total_traffic', float),
('ln_total_traffic', float),
('ln_degree', float),
('ln_weighted_degree', float)])
In [ ]:
ip_summary_frame.rename_columns({'src': 'ip'})
print ip_summary_frame.inspect()
In [ ]:
ip_summary_frame_pd = ip_summary_frame.to_pandas(ip_summary_frame.count())
ip_summary_frame_pd.head()
In [ ]:
outgoing_connections_bins = 100
ln_ibyt_SUM_bins = 100
ln_obyt_SUM_bins = 100
ln_total_traffic_bins = 100
weighted_degree_bins = 100
degree_bins = 100
def plot_hist(histogram):
plt.bar(histogram.cutoffs[:-1], histogram.hist, width = histogram.cutoffs[1]-histogram.cutoffs[0])
plt.xlim(min(histogram.cutoffs), max(histogram.cutoffs))
histograms = {}
histograms['outgoing_connections'] = ip_summary_frame.histogram(
column_name = "outgoing_connections", num_bins=outgoing_connections_bins)
histograms['ln_ibyt_SUM'] = ip_summary_frame.histogram(column_name = "ln_ibyt_SUM", num_bins=ln_ibyt_SUM_bins)
histograms['ln_obyt_SUM'] = ip_summary_frame.histogram(column_name = "ln_obyt_SUM", num_bins=ln_obyt_SUM_bins)
histograms['ln_total_traffic'] = ip_summary_frame.histogram(column_name = "ln_total_traffic", num_bins=ln_total_traffic_bins)
histograms['ln_weighted_degree'] = ip_summary_frame.histogram(column_name = "ln_weighted_degree", num_bins=weighted_degree_bins)
histograms['ln_degree'] = ip_summary_frame.histogram(column_name = "ln_degree", num_bins=degree_bins)
In [ ]:
plot_hist(histograms['outgoing_connections'])
In [ ]:
plot_hist(histograms['ln_ibyt_SUM'])
In [ ]:
plot_hist(histograms['ln_obyt_SUM'])
In [ ]:
plot_hist(histograms['ln_total_traffic'])
In [ ]:
plot_hist(histograms['ln_weighted_degree'])
In [ ]:
plot_hist(histograms['ln_degree'])
In [ ]:
# import the scatter_matrix functionality
from pandas.tools.plotting import scatter_matrix
# define colors list, to be used to plot survived either red (=0) or green (=1)
colors=['yellow','blue']
# make a scatter plot
df = pd.DataFrame(ip_summary_frame.take(ip_summary_frame.count(),
columns=["ln_ibyt_SUM", "ln_obyt_SUM", "ln_degree", "ln_weighted_degree"]),
columns=["ln_ibyt_SUM", "ln_obyt_SUM", "ln_degree", "ln_weighted_degree"])
scatter_matrix(df, figsize=[20,20],marker='x',
c=df.ln_degree.apply(lambda x: colors[1]))
df.info()
In [ ]:
ip_summary_frame.add_columns(lambda row: '1', ("label", float))
ip_summary_frame.inspect()
In [ ]:
SVM_model = tc.models.classification.svm.train(ip_summary_frame, 'label',
['ln_ibyt_SUM', 'ln_obyt_SUM', 'ln_degree', 'ln_weighted_degree'])
In [ ]:
scored_frame = SVM_model.predict(ip_summary_frame,
['ln_ibyt_SUM', 'ln_obyt_SUM', 'ln_degree', 'ln_weighted_degree'])
In [ ]:
scored_frame_group_by = scored_frame.group_by('predicted_label', tc.agg.count)
scored_frame_group_by.inspect(wrap=10, round=4, width=100)
In [ ]:
df = pd.DataFrame(scored_frame.take(ip_summary_frame.count(),
columns=["ln_ibyt_SUM", "ln_obyt_SUM", "ln_degree", "ln_weighted_degree", "predicted_label"]),
columns=["ln_ibyt_SUM", "ln_obyt_SUM", "ln_degree", "ln_weighted_degree", "predicted_label"])
#scored_frame_pd = scored_frame.download(scored_frame.row_count)
# import the scatter_matrix functionality
from pandas.tools.plotting import scatter_matrix
# define colors list, to be used to plot survived either red (=0) or green (=1)
colors=['yellow','blue']
# make a scatter plot
scatter_matrix(df, figsize=[20,20],marker='x',
c=df.predicted_label.apply(lambda x: (colors[1] if x == 1 else colors[0])))
df.info()
In [ ]:
SVM_model.export_to_mar("hdfs://nameservice1/user/vcap/netflow_svm_model.mar")
In [ ]:
from tap_catalog import DataCatalog
In [ ]:
data_catalog = DataCatalog()
In [ ]:
data_catalog.add("hdfs://nameservice1/user/vcap/netflow_svm_model.mar")
In [ ]:
#import hdfsclient
#from hdfsclient import ls, mkdir, rm, mv
In [ ]:
#ls("/user/vcap/*.*")
In [ ]:
#rm("/user/vcap/netflow_svm_model.mar")
In [ ]:
In [ ]: