In [3]:
# Load Libraries - Make sure to run this cell!
import pandas as pd
import numpy as np
import re
from collections import Counter
from sklearn import feature_extraction, tree, model_selection, metrics
from yellowbrick.classifier import ClassificationReport
from yellowbrick.classifier import ConfusionMatrix
import matplotlib.pyplot as plt
import matplotlib
%matplotlib inline
This worksheet is a step-by-step guide on how to detect domains that were generated using "Domain Generation Algorithm" (DGA). We will walk you through the process of transforming raw domain strings to Machine Learning features and creating a decision tree classifer which you will use to determine whether a given domain is legit or not. Once you have implemented the classifier, the worksheet will walk you through evaluating your model.
Overview 2 main steps:
DGA - Background
"Various families of malware use domain generation
algorithms (DGAs) to generate a large number of pseudo-random
domain names to connect to a command and control (C2) server.
In order to block DGA C2 traffic, security organizations must
first discover the algorithm by reverse engineering malware
samples, then generate a list of domains for a given seed. The
domains are then either preregistered, sink-holed or published
in a DNS blacklist. This process is not only tedious, but can
be readily circumvented by malware authors. An alternative
approach to stop malware from using DGAs is to intercept DNS
queries on a network and predict whether domains are DGA
generated. Much of the previous work in DGA detection is based
on finding groupings of like domains and using their statistical
properties to determine if they are DGA generated. However,
these techniques are run over large time windows and cannot be
used for real-time detection and prevention. In addition, many of
these techniques also use contextual information such as passive
DNS and aggregations of all NXDomains throughout a network.
Such requirements are not only costly to integrate, they may not
be possible due to real-world constraints of many systems (such
as endpoint detection). An alternative to these systems is a much
harder problem: detect DGA generation on a per domain basis
with no information except for the domain name. Previous work
to solve this harder problem exhibits poor performance and many
of these systems rely heavily on manual creation of features;
a time consuming process that can easily be circumvented by
malware authors..."
[Citation: Woodbridge et. al 2016: "Predicting Domain Generation Algorithms with Long Short-Term Memory Networks"]
A better alternative for real-world deployment would be to use "featureless deep learning" - We have a separate notebook where you can see how this can be implemented!
However, let's learn the basics first!!!
In [5]:
df_final = pd.read_csv('../../data/dga_features_final_df.csv')
print(df_final.isDGA.value_counts())
df_final.head()
Out[5]:
In [6]:
# Load dictionary of common english words from part 1
from six.moves import cPickle as pickle
with open('../../data/d_common_en_words' + '.pickle', 'rb') as f:
d = pickle.load(f)
To learn simple classification procedures using sklearn we have split the work flow into 5 steps.
target
vector containing the URL labelsX
Tasks:
dga
DataFrame and name the resulting pandas DataFrame 'feature_matrix'
In [7]:
target = df_final['isDGA']
feature_matrix = df_final.drop(['isDGA'], axis=1)
print('Final features', feature_matrix.columns)
print( feature_matrix.head())
Tasks:
In [8]:
# Simple Cross-Validation: Split the data set into training and test data
feature_matrix_train, feature_matrix_test, target_train, target_test = model_selection.train_test_split(feature_matrix, target, test_size=0.25, random_state=33)
In [48]:
feature_matrix_train.count()
Out[48]:
In [49]:
feature_matrix_test.count()
Out[49]:
In [50]:
target_train.head()
Out[50]:
Finally, we have prepared and segmented the data. Let's start classifying!!
Tasks:
.fit()
function with X_train
and target_train
data.If you are interested in trying a real unknown domain, you'll have to create a function to generate the features for that domain before you run it through the classifier (see function is_dga
a few cells below).
In [9]:
# Train the decision tree based on the entropy criterion
clf = tree.DecisionTreeClassifier() # clf means classifier
clf = clf.fit(feature_matrix_train, target_train)
# Extract a row from the test data
test_feature = feature_matrix_test[192:193]
test_target = target_test[192:193]
# Make the prediction
pred = clf.predict(test_feature)
print('Predicted class:', pred)
print('Accurate prediction?', pred[0] == test_target)
In [52]:
feature_matrix_test
Out[52]:
In [10]:
# For simplicity let's just copy the needed function in here again
def H_entropy (x):
# Calculate Shannon Entropy
prob = [ float(x.count(c)) / len(x) for c in dict.fromkeys(list(x)) ]
H = - sum([ p * np.log2(p) for p in prob ])
return H
def vowel_consonant_ratio (x):
# Calculate vowel to consonant ratio
x = x.lower()
vowels_pattern = re.compile('([aeiou])')
consonants_pattern = re.compile('([b-df-hj-np-tv-z])')
vowels = re.findall(vowels_pattern, x)
consonants = re.findall(consonants_pattern, x)
try:
ratio = len(vowels) / len(consonants)
except: # catch zero devision exception
ratio = 0
return ratio
# ngrams: Implementation according to Schiavoni 2014: "Phoenix: DGA-based Botnet Tracking and Intelligence"
# http://s2lab.isg.rhul.ac.uk/papers/files/dimva2014.pdf
def ngrams(word, n):
# Extract all ngrams and return a regular Python list
# Input word: can be a simple string or a list of strings
# Input n: Can be one integer or a list of integers
# if you want to extract multipe ngrams and have them all in one list
l_ngrams = []
if isinstance(word, list):
for w in word:
if isinstance(n, list):
for curr_n in n:
ngrams = [w[i:i+curr_n] for i in range(0,len(w)-curr_n+1)]
l_ngrams.extend(ngrams)
else:
ngrams = [w[i:i+n] for i in range(0,len(w)-n+1)]
l_ngrams.extend(ngrams)
else:
if isinstance(n, list):
for curr_n in n:
ngrams = [word[i:i+curr_n] for i in range(0,len(word)-curr_n+1)]
l_ngrams.extend(ngrams)
else:
ngrams = [word[i:i+n] for i in range(0,len(word)-n+1)]
l_ngrams.extend(ngrams)
# print(l_ngrams)
return l_ngrams
def ngram_feature(domain, d, n):
# Input is your domain string or list of domain strings
# a dictionary object d that contains the count for most common english words
# finally you n either as int list or simple int defining the ngram length
# Core magic: Looks up domain ngrams in english dictionary ngrams and sums up the
# respective english dictionary counts for the respective domain ngram
# sum is normalized
l_ngrams = ngrams(domain, n)
# print(l_ngrams)
count_sum=0
for ngram in l_ngrams:
if d[ngram]:
count_sum+=d[ngram]
try:
feature = count_sum/(len(domain)-n+1)
except:
feature = 0
return feature
def average_ngram_feature(l_ngram_feature):
# input is a list of calls to ngram_feature(domain, d, n)
# usually you would use various n values, like 1,2,3...
return sum(l_ngram_feature)/len(l_ngram_feature)
In [11]:
def is_dga(domain, clf, d):
# Function that takes new domain string, trained model 'clf' as input and
# dictionary d of most common english words
# returns prediction
domain_features = np.empty([1,5])
# order of features is ['length', 'digits', 'entropy', 'vowel-cons', 'ngrams']
domain_features[0,0] = len(domain)
pattern = re.compile('([0-9])')
domain_features[0,1] = len(re.findall(pattern, domain))
domain_features[0,2] = H_entropy(domain)
domain_features[0,3] = vowel_consonant_ratio(domain)
domain_features[0,4] = average_ngram_feature([ngram_feature(domain, d, 1),
ngram_feature(domain, d, 2),
ngram_feature(domain, d, 3)])
pred = clf.predict(domain_features)
return pred[0]
print('Predictions of domain %s is [0 means legit and 1 dga]: ' %('spardeingeld'), is_dga('spardeingeld', clf, d))
print('Predictions of domain %s is [0 means legit and 1 dga]: ' %('google'), is_dga('google', clf, d))
print('Predictions of domain %s is [0 means legit and 1 dga]: ' %('1vxznov16031kjxneqjk1rtofi6'), is_dga('1vxznov16031kjxneqjk1rtofi6', clf, d))
print('Predictions of domain %s is [0 means legit and 1 dga]: ' %('lthmqglxwmrwex'), is_dga('lthmqglxwmrwex', clf, d))
Tasks:
.predict()
method on the clf with your training data X_train
and store the results in a variable called target_pred
.Use sklearn metrics.accuracy_score to determine your models accuracy. Detailed Instruction:
X_test
. Run .predict()
method on the clf with your test data X_test
and store the results in a variable called target_pred
.. target_test
(which are the true labels/groundtruth) AND your models predictions on the test portion target_pred
as inputs. The advantage here is to see how your model performs on new data it has not been seen during the training phase. The fair approach here is a simple cross-validation!Print out the confusion matrix using metrics.confusion_matrix
In [12]:
# fair approach: make prediction on test data portion
target_pred = clf.predict(feature_matrix_test)
print(metrics.accuracy_score(target_test, target_pred))
print('Confusion Matrix\n', metrics.confusion_matrix(target_test, target_pred))
In [13]:
# Classification Report...neat summary
print(metrics.classification_report(target_test, target_pred, target_names=['legit', 'dga']))
In [14]:
# short-cut
clf.score(feature_matrix_test, target_test)
Out[14]:
In [15]:
viz = ConfusionMatrix(clf)
viz.fit(feature_matrix_train, target_train)
viz.score(feature_matrix_test, target_test)
viz.poof()
In [16]:
viz = ClassificationReport(clf)
viz.fit(feature_matrix_train, target_train)
viz.score(feature_matrix_test, target_test)
viz.poof()
Tasks:
Short-Cut All of these steps can be easily achieved by simply using sklearn's model_selection.KFold() and model_selection.cross_val_score() functions.
In [59]:
cvKFold = model_selection.KFold(n_splits=3, shuffle=True, random_state=33)
cvKFold.get_n_splits(feature_matrix)
Out[59]:
In [60]:
scores = model_selection.cross_val_score(clf, feature_matrix, target, cv=cvKFold)
print(scores)
In [61]:
# Get avergage score +- Standard Error (https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.sem.html)
from scipy.stats import sem
def mean_score( scores ):
return "Mean score: {0:.3f} (+/- {1:.3f})".format( np.mean(scores), sem( scores ))
print( mean_score( scores))
As an optional step, you can actually visualize your tree. The following code will generate a graph of your decision tree. You will need graphviz (http://www.graphviz.org) and pydotplus (or pydot) installed for this to work. The Griffon VM has this installed already, but if you try this on a Mac, or Linux machine you will need to install graphviz.
In [ ]:
# These libraries are used to visualize the decision tree and require that you have GraphViz
# and pydot or pydotplus installed on your computer.
from sklearn.externals.six import StringIO
from IPython.core.display import Image
import pydotplus as pydot
dot_data = StringIO()
tree.export_graphviz(clf, out_file=dot_data, feature_names=['length', 'digits', 'entropy', 'vowel-cons', 'ngrams'])
graph = pydot.graph_from_dot_data(dot_data.getvalue())
Image(graph.create_png())
In [63]:
from sklearn import svm
from sklearn.ensemble import RandomForestClassifier
In [65]:
#Create the Random Forest Classifier
random_forest_clf = RandomForestClassifier(n_estimators=10,
max_depth=None,
min_samples_split=2,
random_state=0)
random_forest_clf = random_forest_clf.fit(feature_matrix_train, target_train)
In [64]:
#Next, create the SVM classifier
svm_classifier = svm.SVC()
svm_classifier = svm_classifier.fit(feature_matrix_train, target_train)
In [ ]: