Классификатор вредоносных доменов


In [1]:
import logging
logger = logging.getLogger("SR")
ch = logging.StreamHandler()
ch.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(ch)
logger.setLevel(logging.DEBUG)

Создадим все нужные нам трансформеры и эстиматоры (тут вообще надо будет аккуратней сделать)


In [2]:
import itertools as it
from sklearn.base import BaseEstimator, TransformerMixin, ClassifierMixin
from tldextract import TLDExtract
from binlog_reader import binlog_reader
import numpy as np

class GroupActivity(BaseEstimator, TransformerMixin):

    def _similarity(self, ips_1, ips_2):
        a, b = float(len(ips_1)), float(len(ips_2))
        c = float(len(ips_1 & ips_2))
        return 0.5 * (c/a + c/b), c / len(ips_1 | ips_2), int(c)
    
    def fit(self, x, y=None):
        return self

    def transform(self, d2ip_idx_list):
        logger.debug("Start calc group activity")
        all_domains = list(it.chain.from_iterable([d2ip.keys() for d2ip in  d2ip_idx_list]))
        result = {domain: dict() for domain in all_domains}
        pairs = list(filter(lambda (x, y): x < y, it.permutations(range(len(d2ip_idx_list)), 2)))
        ptn = "({}, {})_{}"

        for idx, domain in enumerate(all_domains):
            for l ,r in pairs:
                left, right =  d2ip_idx_list[l].get(domain),  d2ip_idx_list[r].get(domain)
                sim, jacc, ln = 0., 0., 0.
                if left and right:
                    sim, jacc, ln = self._similarity(left, right)
                result[domain].update({ptn.format(l, r, "sim"): sim,
                                       ptn.format(l, r, "jcd"): jacc,
                                       ptn.format(l, r, "length"): ln})
            if (idx + 1) % 100000 == 0:
                logger.debug("Processed %d domains", idx + 1)
                
        return result
        
    
    
class IndexCreator(BaseEstimator, TransformerMixin):
    def fit(self, x, y=None):
        return self
    
    def transform(self, queries_all):
        result = []
        for idx, queries in enumerate(queries_all):
            logger.debug("Creating index, part %d/%d", idx + 1, len(queries_all))
            ip2d, d2ip = dict(), dict()
            for ip, domain in queries:
                ip2d.setdefault(ip, set())
                ip2d[ip].add(domain)
                d2ip.setdefault(domain, set())
                d2ip[domain].add(ip)
            result.append({"ip2d": ip2d, "d2ip": d2ip})
        return result


class IndexMerger(BaseEstimator, TransformerMixin):
    def fit(self, x, y=None):
        return self
    
    def transform(self, indexes):
        ip2d_full, d2ip_full = dict(), dict()

        for curr in indexes:
            for domain in curr['d2ip']:
                d2ip_full.setdefault(domain, set())
                d2ip_full[domain] |= curr['d2ip'][domain]

            for ip in curr["ip2d"]:
                ip2d_full.setdefault(ip, set())
                ip2d_full[ip] |= curr['ip2d'][ip]
        return ip2d_full, d2ip_full
    

class PairExtractor(BaseEstimator, TransformerMixin):
    def __init__(self):
        self._extract = TLDExtract(include_psl_private_domains=True)
    
    def sanitize(self, domain):
        return self._extract(domain).registered_domain

    def fit(self, x, y=None):
        return self
    
    def transform(self, queries_all):
        result = []
        for idx, queries in enumerate(queries_all):
            logger.debug("Transforming domains, part %d/%d", idx + 1,  len(queries_all))
            current = [(ip, self.sanitize(domain)) for ip, domain in queries]
            current = filter(lambda (ip, domain): domain, current)
            result.append(set(current))
        return result


class BLReader(BaseEstimator, TransformerMixin):
    def __init__(self, fields):
        self.fields = fields

    def fit(self, x, y=None):
        return self
    
    def transform(self, fnames):
        result = []
        for idx, fn in enumerate(fnames):
            with open(fn, 'rb') as infile:
                logger.debug("Read file %s, part %d/%d", fn, idx + 1, len(fnames))
                reader = binlog_reader(infile, self.fields)
                current = [tuple([query[fld] for fld in self.fields]) for query in reader]
                result.append(set(current))
        return result


class PosNegDomainReader(BaseEstimator, TransformerMixin):
    def __init__(self):
        self._extract = TLDExtract(include_psl_private_domains=True)

    def sanitize(self, domain):
        return self._extract(domain).registered_domain
        
    def fit(self, x, y=None):
        return self
    
    def transform(self, fnames):
        if len(fnames) != 2:
            raise RuntimeError("Must be 2 files: positive and negative")
        result = []
        for fn in fnames:
            with open(fn, 'rb') as infile:
                logger.debug("Read file %s", fn)
                current = [self.sanitize(line.strip()) for line in infile]
                current = filter(lambda domain: domain, current)
                result.append(set(current))
        return result


class FormXy(BaseEstimator, TransformerMixin):
    def __init__(self, all_domains):
        self.all_domains = set(all_domains)

    def fit(self, x, y=None):
        return self
    
    def transform(self, lst):
        if len(lst) != 2:
            raise RuntimeError("Must be list with 2 sets: positive and negative")
        pos, neg = lst
        pos, neg = set(pos), set(neg)
        inter = pos & neg
        logger.debug("Positive size %d, Negative size %d, Intersection %d", len(pos), len(neg), len(inter))
        pos, neg = (pos - inter) & self.all_domains, (neg - inter) & self.all_domains
        logger.debug("Positive after %d, Negative after %d", len(pos), len(neg))
        X = list(pos) + list(neg)
        y = [1 for _ in range(len(pos))] + [-1 for _ in range(len(neg))]
        return np.array(X), np.array(y)


class ItemSelector(BaseEstimator, TransformerMixin):
    def __init__(self, key):
        self.key = key

    def fit(self, x, y=None):
        return self

    def transform(self, indexes_all):
        return [idx[self.key] for idx in indexes_all]
    

class SecureRank(ClassifierMixin):
    def __init__(self, ip2d, d2ip, max_iter=20, init_abs_score=10.):
        self.ip2d = ip2d
        self.d2ip = d2ip
        self.max_iter = max_iter
        self.default_score = init_abs_score
        self.f_ord = ['sc_score', 'black_score', 'white_score']
    
    def fit(self, domain, d_class):
        logger.debug("Initialize scores")
        self.rank_ip = {ip: {'sc_score': 0., 'black_score': 0., 'white_score': 0.} for ip in self.ip2d}
        self.rank_d = {d:  {'sc_score': 0., 'black_score': 0., 'white_score': 0.} for d in self.d2ip}
        
        for dom, cls in zip(domain, d_class):
            if cls == 1:
                self.rank_d[dom]['sc_score'] = -float(self.default_score)
                self.rank_d[dom]['black_score'] = -float(self.default_score)
                
            elif cls == -1:
                self.rank_d[dom]['sc_score'] = float(self.default_score)
                self.rank_d[dom]['white_score'] = float(self.default_score)

        
        for it in range(self.max_iter):
            logger.debug("Iteration %d", it + 1)
            
            for ip in self.rank_ip:
                self.rank_ip[ip]['sc_score'] = sum(self.rank_d[d]['sc_score']/len(self.d2ip[d]) 
                                                   for d in self.ip2d[ip])
                self.rank_ip[ip]['black_score'] = sum(self.rank_d[d]['black_score']/len(self.d2ip[d]) 
                                                      for d in self.ip2d[ip])
                self.rank_ip[ip]['white_score'] =  sum(self.rank_d[d]['white_score']/len(self.d2ip[d]) 
                                                       for d in self.ip2d[ip])

            for domain in self.rank_d:
                self.rank_d[domain]['sc_score'] = sum(self.rank_ip[ip]['sc_score']/len(self.ip2d[ip]) 
                                                      for ip in self.d2ip[domain])
                self.rank_d[domain]['black_score'] = sum(self.rank_ip[ip]['black_score']/len(self.ip2d[ip]) 
                                                         for ip in self.d2ip[domain])
                self.rank_d[domain]['white_score'] = sum(self.rank_ip[ip]['white_score']/len(self.ip2d[ip]) 
                                                         for ip in self.d2ip[domain])
                
        return self
                
    def transform(self, domains):
        return [[self.rank_d[dom][f] for f in self.f_ord] for dom in domains]


class IndentityOp(BaseEstimator, TransformerMixin, ClassifierMixin):
    def __init__(self, info):
        self.info = info
        self.f_ord = self.info[self.info.keys()[0]].keys()
    
    def fit(self, X, y):
        return self
    
    def transform(self, X):
        return [[self.info[x][f] for f in self.f_ord] for x in X]
    
def perf_measure(y_pred, y_test):
    tp, tn, fp, fn = 0, 0, 0, 0
    for idx, (real, pred) in enumerate(zip(y_test, y_pred)):
        if real == pred:
            if real == 1:
                tp += 1
            else:
                tn += 1
        else:
            if real == 1:
                fp += 1
            else:
                fn += 1

    return(tp, tn, fp, fn)

Вычитываем файлы, обрабатываем, считаем групповую активность


In [3]:
import itertools as it
from sklearn.pipeline import Pipeline, FeatureUnion

"""
fnames = [
    "/home/ivan/dns-03-aprl/querylog.bin.33-1459677601",
    "/home/ivan/dns-03-aprl/querylog.bin.33-1459681201",
    "/home/ivan/dns-03-aprl/querylog.bin.33-1459684801",
]

apriori_fnames = [
     "/home/ivan/3_aprl_blacklist",
     "/home/ivan/dns-whitelist/top-1m",
]
"""

fnames = [
    "/home/ivan/dns-13-aprl/querylog.bin.33-1460530801",
    "/home/ivan/dns-13-aprl/querylog.bin.33-1460534401",
    "/home/ivan/dns-13-aprl/querylog.bin.33-1460538001",
    "/home/ivan/dns-13-aprl/querylog.bin.33-1460541601",
    "/home/ivan/dns-13-aprl/querylog.bin.33-1460545201",
    "/home/ivan/dns-13-aprl/querylog.bin.33-1460548801",
]

apriori_fnames = [
     "/home/ivan/13_aprl_blacklist",
     "/home/ivan/dns-whitelist/top-1m",
]

preprocessing = Pipeline([
        ('raw_pairs', BLReader(fields=["client_ip", "dname"])),
        ('clean_pairs', PairExtractor()),
        ('indexes', IndexCreator())
])
group_activity = Pipeline([
        ("selector", ItemSelector(key="d2ip")),
        ("grp_act", GroupActivity())
])

indexes = preprocessing.transform(fnames)
ip2d_full, d2ip_full = IndexMerger().transform(indexes)
group_act = group_activity.transform(indexes)


2016-04-13 18:35:17,140 - DEBUG - Read file /home/ivan/dns-13-aprl/querylog.bin.33-1460530801, part 1/6
2016-04-13 18:35:45,597 - DEBUG - Read file /home/ivan/dns-13-aprl/querylog.bin.33-1460534401, part 2/6
2016-04-13 18:36:19,287 - DEBUG - Read file /home/ivan/dns-13-aprl/querylog.bin.33-1460538001, part 3/6
2016-04-13 18:36:56,527 - DEBUG - Read file /home/ivan/dns-13-aprl/querylog.bin.33-1460541601, part 4/6
2016-04-13 18:37:33,821 - DEBUG - Read file /home/ivan/dns-13-aprl/querylog.bin.33-1460545201, part 5/6
2016-04-13 18:38:16,428 - DEBUG - Read file /home/ivan/dns-13-aprl/querylog.bin.33-1460548801, part 6/6
2016-04-13 18:39:02,536 - DEBUG - Transforming domains, part 1/6
2016-04-13 18:39:24,944 - DEBUG - Transforming domains, part 2/6
2016-04-13 18:39:49,730 - DEBUG - Transforming domains, part 3/6
2016-04-13 18:40:17,259 - DEBUG - Transforming domains, part 4/6
2016-04-13 18:40:47,249 - DEBUG - Transforming domains, part 5/6
2016-04-13 18:41:17,491 - DEBUG - Transforming domains, part 6/6
2016-04-13 18:41:54,057 - DEBUG - Creating index, part 1/6
2016-04-13 18:41:57,959 - DEBUG - Creating index, part 2/6
2016-04-13 18:42:02,600 - DEBUG - Creating index, part 3/6
2016-04-13 18:42:08,221 - DEBUG - Creating index, part 4/6
2016-04-13 18:42:12,644 - DEBUG - Creating index, part 5/6
2016-04-13 18:42:17,651 - DEBUG - Creating index, part 6/6
2016-04-13 18:42:37,275 - DEBUG - Start calc group activity
2016-04-13 18:42:44,563 - DEBUG - Processed 100000 domains
2016-04-13 18:42:50,598 - DEBUG - Processed 200000 domains
2016-04-13 18:42:57,393 - DEBUG - Processed 300000 domains
2016-04-13 18:43:03,066 - DEBUG - Processed 400000 domains
2016-04-13 18:43:08,365 - DEBUG - Processed 500000 domains
2016-04-13 18:43:12,871 - DEBUG - Processed 600000 domains
2016-04-13 18:43:17,468 - DEBUG - Processed 700000 domains
2016-04-13 18:43:21,986 - DEBUG - Processed 800000 domains
2016-04-13 18:43:26,456 - DEBUG - Processed 900000 domains

Загружаем и вычищаем обучающую выборку


In [4]:
read_aprior_info = Pipeline([
        ("reader", PosNegDomainReader()),
        ("form_sets", FormXy(group_act.keys()))
])
X, y = read_aprior_info.transform(apriori_fnames)


2016-04-13 18:43:27,460 - DEBUG - Read file /home/ivan/13_aprl_blacklist
2016-04-13 18:43:32,034 - DEBUG - Read file /home/ivan/dns-whitelist/top-1m
2016-04-13 18:43:37,407 - DEBUG - Positive size 886849, Negative size 959379, Intersection 875
2016-04-13 18:43:37,824 - DEBUG - Positive after 3923, Negative after 83564

Валидируем результат


In [5]:
from sklearn.cross_validation import cross_val_score, train_test_split
from sklearn.metrics import roc_auc_score, precision_score, classification_report
from sklearn.ensemble import AdaBoostClassifier
from sklearn.grid_search import GridSearchCV


union = FeatureUnion(
    transformer_list=[
        ('ranking', SecureRank(ip2d_full, d2ip_full, max_iter=20)),
        ('group_activities', IndentityOp(group_act))
    ])

clf = Pipeline([
        ('preparation', union),
        ('final_score', AdaBoostClassifier(random_state=42))
    ])

X_train, X_test, y_train, y_test =  train_test_split(X, y, test_size=0.3, random_state=42)
clf.fit(X_train, y_train)


2016-04-13 18:43:38,255 - DEBUG - Initialize scores
2016-04-13 18:43:38,566 - DEBUG - Iteration 1
2016-04-13 18:43:58,235 - DEBUG - Iteration 2
2016-04-13 18:44:18,432 - DEBUG - Iteration 3
2016-04-13 18:44:38,644 - DEBUG - Iteration 4
2016-04-13 18:44:58,798 - DEBUG - Iteration 5
2016-04-13 18:45:18,965 - DEBUG - Iteration 6
2016-04-13 18:45:39,093 - DEBUG - Iteration 7
2016-04-13 18:45:59,261 - DEBUG - Iteration 8
2016-04-13 18:46:19,402 - DEBUG - Iteration 9
2016-04-13 18:46:39,589 - DEBUG - Iteration 10
2016-04-13 18:46:59,742 - DEBUG - Iteration 11
2016-04-13 18:47:19,892 - DEBUG - Iteration 12
2016-04-13 18:47:40,097 - DEBUG - Iteration 13
2016-04-13 18:48:00,284 - DEBUG - Iteration 14
2016-04-13 18:48:20,471 - DEBUG - Iteration 15
2016-04-13 18:48:40,676 - DEBUG - Iteration 16
2016-04-13 18:49:00,795 - DEBUG - Iteration 17
2016-04-13 18:49:20,944 - DEBUG - Iteration 18
2016-04-13 18:49:41,086 - DEBUG - Iteration 19
2016-04-13 18:50:01,198 - DEBUG - Iteration 20
Out[5]:
Pipeline(steps=[('preparation', FeatureUnion(n_jobs=1,
       transformer_list=[('ranking', <__main__.SecureRank object at 0x7faa295cd150>), ('group_activities', IndentityOp(info={'samasyangin.com': {'(1, 3)_length': 0.0, '(0, 1)_jcd': 0.0, '(0, 5)_jcd': 0.0, '(2, 4)_length': 0.0, '(4, 5)_jcd': 0.0, '(3, 4)_...thm='SAMME.R', base_estimator=None,
          learning_rate=1.0, n_estimators=50, random_state=42))])

In [6]:
y_pred_roc = clf.predict_proba(X_test)[:,1]
y_pred = clf.predict(X_test)

In [14]:
clf.fit(X, y)


2016-04-13 19:08:49,167 - DEBUG - Initialize scores
2016-04-13 19:08:49,771 - DEBUG - Iteration 1
2016-04-13 19:09:09,509 - DEBUG - Iteration 2
2016-04-13 19:09:29,801 - DEBUG - Iteration 3
2016-04-13 19:09:50,069 - DEBUG - Iteration 4
2016-04-13 19:10:10,378 - DEBUG - Iteration 5
2016-04-13 19:10:30,644 - DEBUG - Iteration 6
2016-04-13 19:10:50,850 - DEBUG - Iteration 7
2016-04-13 19:11:11,192 - DEBUG - Iteration 8
2016-04-13 19:11:31,430 - DEBUG - Iteration 9
2016-04-13 19:11:51,708 - DEBUG - Iteration 10
2016-04-13 19:12:12,110 - DEBUG - Iteration 11
2016-04-13 19:12:32,303 - DEBUG - Iteration 12
2016-04-13 19:12:52,517 - DEBUG - Iteration 13
2016-04-13 19:13:12,835 - DEBUG - Iteration 14
2016-04-13 19:13:33,041 - DEBUG - Iteration 15
2016-04-13 19:13:53,211 - DEBUG - Iteration 16
2016-04-13 19:14:13,614 - DEBUG - Iteration 17
2016-04-13 19:14:33,889 - DEBUG - Iteration 18
2016-04-13 19:14:54,150 - DEBUG - Iteration 19
2016-04-13 19:15:14,470 - DEBUG - Iteration 20
Out[14]:
Pipeline(steps=[('preparation', FeatureUnion(n_jobs=1,
       transformer_list=[('ranking', <__main__.SecureRank object at 0x7faa295cd150>), ('group_activities', IndentityOp(info={'samasyangin.com': {'(1, 3)_length': 0.0, '(0, 1)_jcd': 0.0, '(0, 5)_jcd': 0.0, '(2, 4)_length': 0.0, '(4, 5)_jcd': 0.0, '(3, 4)_...thm='SAMME.R', base_estimator=None,
          learning_rate=1.0, n_estimators=50, random_state=42))])

In [15]:
X_graph = d2ip_full.keys()
y_graph_prob = clf.predict_proba(X_graph)[:, np.where(clf.classes_ == 1)[0][0]]
y_graph_labels = clf.predict(X_graph)

In [17]:
result = sorted(zip(X_graph, y_graph_prob, y_graph_labels), key=lambda x: x[1], reverse=True)
with open('/home/ivan/13-aprl-lastresult.txt', 'w') as outfile:
    for domain, prob, lab in result:
        outfile.write("{}\t{}\t{}\n".format(domain, prob, lab))

In [ ]:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
%matplotlib inline
from pylab import rcParams
rcParams['figure.figsize'] = 15, 15

In [ ]:
y_new = clf.predict_proba(X_test)
plt.hist(y_new[:,1][np.where(y_test > 0)], bins=30, color='r', alpha=0.5, normed=True)
plt.hist(y_new[:,1][np.where(y_test < 0)], bins=30, color='b', alpha=0.5, normed=True)
plt.show()

In [7]:
from sklearn.metrics import roc_auc_score, precision_score, classification_report
print("ROC-AUC {}".format(roc_auc_score(y_test, y_pred)))
print("PRECISION {}".format(precision_score(y_test, y_pred)))
print(classification_report(y_test, y_pred))
TP, TN, FP, FN = perf_measure(y_pred, y_test)
print("TP:{} | FP: {}| TN: {}| FN: {}".format(TP, FP, TN, FN))


ROC-AUC 0.971075609507
PRECISION 0.992869875223
             precision    recall  f1-score   support

         -1       1.00      1.00      1.00     25065
          1       0.99      0.94      0.97      1182

avg / total       1.00      1.00      1.00     26247

TP:1114 | FP: 68| TN: 25057| FN: 8