In [1]:
# Load Libraries - Make sure to run this cell!
import pandas as pd
import numpy as np
import re
from collections import Counter
from sklearn import feature_extraction, tree, model_selection, metrics
from yellowbrick.classifier import ClassificationReport
from yellowbrick.classifier import ConfusionMatrix
import matplotlib.pyplot as plt
import matplotlib
%matplotlib inline
In [4]:
# Load dictionary of common english words from part 1
from six.moves import cPickle as pickle
with open('../../data/d_common_en_words' + '.pickle', 'rb') as f:
d = pickle.load(f)
This worksheet is a step-by-step guide on how to detect domains that were generated using "Domain Generation Algorithm" (DGA). We will walk you through the process of transforming raw domain strings to Machine Learning features and creating a decision tree classifer which you will use to determine whether a given domain is legit or not. Once you have implemented the classifier, the worksheet will walk you through evaluating your model.
Overview 2 main steps:
DGA - Background
"Various families of malware use domain generation
algorithms (DGAs) to generate a large number of pseudo-random
domain names to connect to a command and control (C2) server.
In order to block DGA C2 traffic, security organizations must
first discover the algorithm by reverse engineering malware
samples, then generate a list of domains for a given seed. The
domains are then either preregistered, sink-holed or published
in a DNS blacklist. This process is not only tedious, but can
be readily circumvented by malware authors. An alternative
approach to stop malware from using DGAs is to intercept DNS
queries on a network and predict whether domains are DGA
generated. Much of the previous work in DGA detection is based
on finding groupings of like domains and using their statistical
properties to determine if they are DGA generated. However,
these techniques are run over large time windows and cannot be
used for real-time detection and prevention. In addition, many of
these techniques also use contextual information such as passive
DNS and aggregations of all NXDomains throughout a network.
Such requirements are not only costly to integrate, they may not
be possible due to real-world constraints of many systems (such
as endpoint detection). An alternative to these systems is a much
harder problem: detect DGA generation on a per domain basis
with no information except for the domain name. Previous work
to solve this harder problem exhibits poor performance and many
of these systems rely heavily on manual creation of features;
a time consuming process that can easily be circumvented by
malware authors..."
[Citation: Woodbridge et. al 2016: "Predicting Domain Generation Algorithms with Long Short-Term Memory Networks"]
A better alternative for real-world deployment would be to use "featureless deep learning" - We have a separate notebook where you can see how this can be implemented!
However, let's learn the basics first!!!
In [3]:
df_final = pd.read_csv('../../data/dga_features_final_df.csv')
print(df_final.isDGA.value_counts())
df_final.head()
Out[3]:
To learn simple classification procedures using sklearn we have split the work flow into 5 steps.
target
vector containing the URL labelsX
Tasks:
dga
DataFrame and name the resulting pandas DataFrame 'feature_matrix'
In [ ]:
#Your code here...
print('Final features', feature_matrix.columns)
print( feature_matrix.head() )
You should get the following result.
Final features Index(['length', 'digits', 'entropy', 'vowel-cons', 'ngrams'], dtype='object')
length digits entropy vowel-cons ngrams
0 13 0 3.546594 0.300000 968.076729
1 25 10 3.833270 0.250000 481.067222
2 12 0 2.855389 0.090909 1036.365657
3 26 6 3.844107 0.052632 708.328718
4 12 0 3.084963 0.090909 897.543434
SKlearn has a function to split your data into testing and training datasets. It is HIGHLY recommended to use the built-in functions rather than do this yourself as you are more likely to get good data using the built-in functions.
Use a test size of 0.25 and a random state of 33.
Tasks:
feature_matrix_train
, feature_matrix_test
, target_train
, target_test
In [ ]:
# Your code here...
Take a quick look at the feature and target sets so that you understand what they contain. Also verify that your datasets are the correct size.
In [ ]:
print( feature_matrix_train.count() ) #Should be 1500
print( feature_matrix_test.count() ) #Should be 500
Finally, we have prepared and segmented the data. Let's start classifying!!
Tasks:
.fit()
function with feature_matrix_train
and target_train
data.If you are interested in trying a real unknown domain, you'll have to create a function to generate the features for that domain before you run it through the classifier (see function is_dga
a few cells below).
In [6]:
# Train the decision tree based on the entropy criterion
#Your code here...
# Extract a row from the test data
# Train the decision tree based on the entropy criterion
clf = tree.DecisionTreeClassifier() # clf means classifier
clf = clf.fit(feature_matrix_train, target_train)
# Extract a row from the test data
test_feature = feature_matrix_test[192:193]
test_target = target_test[192:193]
# Make the prediction
pred = clf.predict(test_feature)
print('Predicted class:', pred)
print('Accurate prediction?', pred[0] == test_target)
In [7]:
# For simplicity let's just copy the needed function in here again
def H_entropy (x):
# Calculate Shannon Entropy
prob = [ float(x.count(c)) / len(x) for c in dict.fromkeys(list(x)) ]
H = - sum([ p * np.log2(p) for p in prob ])
return H
def vowel_consonant_ratio (x):
# Calculate vowel to consonant ratio
x = x.lower()
vowels_pattern = re.compile('([aeiou])')
consonants_pattern = re.compile('([b-df-hj-np-tv-z])')
vowels = re.findall(vowels_pattern, x)
consonants = re.findall(consonants_pattern, x)
try:
ratio = len(vowels) / len(consonants)
except: # catch zero devision exception
ratio = 0
return ratio
# ngrams: Implementation according to Schiavoni 2014: "Phoenix: DGA-based Botnet Tracking and Intelligence"
# http://s2lab.isg.rhul.ac.uk/papers/files/dimva2014.pdf
def ngrams(word, n):
# Extract all ngrams and return a regular Python list
# Input word: can be a simple string or a list of strings
# Input n: Can be one integer or a list of integers
# if you want to extract multipe ngrams and have them all in one list
l_ngrams = []
if isinstance(word, list):
for w in word:
if isinstance(n, list):
for curr_n in n:
ngrams = [w[i:i+curr_n] for i in range(0,len(w)-curr_n+1)]
l_ngrams.extend(ngrams)
else:
ngrams = [w[i:i+n] for i in range(0,len(w)-n+1)]
l_ngrams.extend(ngrams)
else:
if isinstance(n, list):
for curr_n in n:
ngrams = [word[i:i+curr_n] for i in range(0,len(word)-curr_n+1)]
l_ngrams.extend(ngrams)
else:
ngrams = [word[i:i+n] for i in range(0,len(word)-n+1)]
l_ngrams.extend(ngrams)
# print(l_ngrams)
return l_ngrams
def ngram_feature(domain, d, n):
# Input is your domain string or list of domain strings
# a dictionary object d that contains the count for most common english words
# finally you n either as int list or simple int defining the ngram length
# Core magic: Looks up domain ngrams in english dictionary ngrams and sums up the
# respective english dictionary counts for the respective domain ngram
# sum is normalized
l_ngrams = ngrams(domain, n)
# print(l_ngrams)
count_sum=0
for ngram in l_ngrams:
if d[ngram]:
count_sum+=d[ngram]
try:
feature = count_sum/(len(domain)-n+1)
except:
feature = 0
return feature
def average_ngram_feature(l_ngram_feature):
# input is a list of calls to ngram_feature(domain, d, n)
# usually you would use various n values, like 1,2,3...
return sum(l_ngram_feature)/len(l_ngram_feature)
In [8]:
def is_dga(domain, clf, d):
# Function that takes new domain string, trained model 'clf' as input and
# dictionary d of most common english words
# returns prediction
domain_features = np.empty([1,5])
# order of features is ['length', 'digits', 'entropy', 'vowel-cons', 'ngrams']
domain_features[0,0] = len(domain)
pattern = re.compile('([0-9])')
domain_features[0,1] = len(re.findall(pattern, domain))
domain_features[0,2] = H_entropy(domain)
domain_features[0,3] = vowel_consonant_ratio(domain)
domain_features[0,4] = average_ngram_feature([ngram_feature(domain, d, 1),
ngram_feature(domain, d, 2),
ngram_feature(domain, d, 3)])
pred = clf.predict(domain_features)
return pred[0]
print('Predictions of domain %s is [0 means legit and 1 dga]: ' %('spardeingeld'), is_dga('spardeingeld', clf, d))
print('Predictions of domain %s is [0 means legit and 1 dga]: ' %('google'), is_dga('google', clf, d))
print('Predictions of domain %s is [0 means legit and 1 dga]: ' %('1vxznov16031kjxneqjk1rtofi6'), is_dga('1vxznov16031kjxneqjk1rtofi6', clf, d))
print('Predictions of domain %s is [0 means legit and 1 dga]: ' %('lthmqglxwmrwex'), is_dga('lthmqglxwmrwex', clf, d))
Tasks:
.predict()
method on the clf with your training data feature_matrix_train
and store the results in a variable called target_pred
.Use sklearn metrics.accuracy_score to determine your models accuracy. Detailed Instruction:
feature_matrix_test
. Run .predict()
method on the clf with your test data feature_matrix_test
and store the results in a variable called target_pred
.. target_test
(which are the true labels/groundtruth) AND your models predictions on the test portion target_pred
as inputs. The advantage here is to see how your model performs on new data it has not been seen during the training phase. The fair approach here is a simple cross-validation!Print out the confusion matrix using metrics.confusion_matrix
You also might want to try out the metrics.classification_report()
on your data. In one command you can get:
precision recall f1-score support
legit 0.87 0.86 0.86 256
dga 0.85 0.87 0.86 244
avg / total 0.86 0.86 0.86 500
In [ ]:
# fair approach: make prediction on test data portion
#Your code here...
In [ ]:
# Classification Report...neat summary
#Your code here...
In [ ]:
# short-cut
clf.score(feature_matrix_test, target_test)
Repeat this process using the training data. Is the accuracy higher or lower? Why do you think that is the case?
In [ ]:
#Your code here...
Use Yellowbrick to visualize the confusion matrix and classification report.
In [ ]:
#Your code here...
Tasks:
Short-Cut All of these steps can be easily achieved by simply using sklearn's model_selection.KFold() and model_selection.cross_val_score() functions.
In [12]:
cvKFold = model_selection.KFold(n_splits=3, shuffle=True, random_state=33)
cvKFold.get_n_splits(feature_matrix)
Out[12]:
In [13]:
scores = model_selection.cross_val_score(clf, feature_matrix, target, cv=cvKFold)
print(scores)
In [14]:
# Get avergage score +- Standard Error (https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.sem.html)
from scipy.stats import sem
def mean_score( scores ):
return "Mean score: {0:.3f} (+/- {1:.3f})".format( np.mean(scores), sem( scores ))
print( mean_score( scores))
As an optional step, you can actually visualize your tree. The following code will generate a graph of your decision tree. You will need graphviz (http://www.graphviz.org) and pydotplus (or pydot) installed for this to work. The Griffon VM has this installed already, but if you try this on a Mac, or Linux machine you will need to install graphviz.
In [15]:
# These libraries are used to visualize the decision tree and require that you have GraphViz
# and pydot or pydotplus installed on your computer.
from sklearn.externals.six import StringIO
from IPython.core.display import Image
import pydotplus as pydot
dot_data = StringIO()
tree.export_graphviz(clf, out_file=dot_data, feature_names=['length', 'digits', 'entropy', 'vowel-cons', 'ngrams'])
graph = pydot.graph_from_dot_data(dot_data.getvalue())
Image(graph.create_png())
Out[15]:
In [ ]:
#First we'll need to import the models...
from sklearn import svm
from sklearn.ensemble import RandomForestClassifier
Next, you are going to create an SVM Classifer and repeat the process using this model. The code to create an SVM classifier is:
svm_classifier = svm.SVC()
svm_classifier = svm_classifier.fit(<features>, <target>)
Which model is better, the SVM or decision tree?
In [ ]:
#Your code here...
Scikit-learn also has a random forest classifier which is very popular and effective in data science competitions. You can read more about Random Forest here (https://en.wikipedia.org/wiki/Random_forest) as well as the scikit-learn implementation here: (http://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestClassifier.html).
Random Forest is an ensemble classifier and requires more tuning parameters in order to create the classifier, however, coding-wise, it functions exactly the same as other classifiers.
For this exercise, train a random forest classifier as follows on your training data and evaluate the results as you did for the SVM and decision tree classifier.
random_forest_clf = RandomForestClassifier(n_estimators=10,
max_depth=None,
min_samples_split=2,
random_state=0)
random_forest_clf = random_forest_clf.fit( <features>, <target>)
Which classifier produced the best results?
In [ ]:
#Your code here...
The following code prints out a nicer looking confusion matrix. Code from From: http://scikit-learn.org/stable/auto_examples/model_selection/plot_confusion_matrix.html#sphx-glr-auto-examples-model-selection-plot-confusion-matrix-py.
You can call it as follows:
# Compute confusion matrix
cnf_matrix = confusion_matrix( target_test, test_predictions )
np.set_printoptions(precision=2)
# Plot non-normalized confusion matrix
plt.figure()
plot_confusion_matrix(cnf_matrix, classes=['Not Malicious', 'Malicious'],
title='RF Confusion matrix, without normalization')
# Plot normalized confusion matrix
plt.figure()
plot_confusion_matrix(cnf_matrix, classes=['Not Malicious', 'Malicious'], normalize=True,
title='RF Normalized confusion matrix')
plt.show()
In [ ]:
def plot_confusion_matrix(cm, classes,
normalize=False,
title='Confusion matrix',
cmap=plt.cm.Blues):
"""
This function prints and plots the confusion matrix.
Normalization can be applied by setting `normalize=True`.
"""
plt.imshow(cm, interpolation='nearest', cmap=cmap)
plt.title(title)
plt.colorbar()
tick_marks = np.arange(len(classes))
plt.xticks(tick_marks, classes, rotation=45)
plt.yticks(tick_marks, classes)
if normalize:
cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
thresh = cm.max() / 2.
for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
plt.text(j, i, cm[i, j],
horizontalalignment="center",
color="white" if cm[i, j] > thresh else "black")
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')