ThreatExchange Data Dashboard

Purpose

The ThreatExchange APIs are designed to make consuming threat intelligence from multiple sources easy. This notebook will walk you through:

  • building an initial dashboard for assessing the data visible to your appID;
  • filtering down to a subset you consider high value; and
  • exporting the high value data to a file.

What you need

Before getting started, you'll need a few Python packages installed:

  • Pandas for data manipulation and analysis
  • Pytx for ThreatExchange access
  • Seaborn for making charts pretty

All of the python packages mentioned can be installed via

pip install <package_name>

Setup a ThreatExchange access_token

If you don't already have an access_token for your app, use the Facebook Access Token Tool to get one.


In [ ]:
from pytx.access_token import access_token
from pytx.logger import setup_logger
from pytx.vocabulary import PrivacyType as pt

# Specify the location of your token via one of several ways:
# https://pytx.readthedocs.org/en/latest/pytx.access_token.html
access_token()

Optionally, enable debug level logging


In [ ]:
# Uncomment this if you want debug logging enabled
#setup_logger(log_file="pytx.log")

Search for data in ThreatExchange

Start by running a query against the ThreatExchange APIs to pull down any/all data relevant to you over a specified period of days.


In [ ]:
# Our basic search parameters, we default to querying over the past 14 days
days_back = 14
search_terms = ['abuse', 'phishing', 'malware', 'exploit', 'apt', 'ddos', 'brute', 'scan', 'cve']

Next, we execute the query using our search parameters and put the results in a Pandas DataFrame


In [ ]:
from datetime import datetime, timedelta
from time import strftime
import pandas as pd
import re

from pytx import ThreatDescriptor
from pytx.vocabulary import ThreatExchange as te

# Define your search string and other params, see 
# https://pytx.readthedocs.org/en/latest/pytx.common.html#pytx.common.Common.objects
# for the full list of options
search_params = {
    te.FIELDS: ThreatDescriptor._default_fields,
    te.LIMIT: 1000,
    te.SINCE: strftime('%Y-%m-%d %H:%m:%S +0000', (datetime.utcnow() + timedelta(days=(-1*days_back))).timetuple()),
    te.TEXT: search_terms,
    te.UNTIL: strftime('%Y-%m-%d %H:%m:%S +0000', datetime.utcnow().timetuple()),
    te.STRICT_TEXT: False
}

data_frame = None
for search_term in search_terms:
    print "Searching for '%s' over -%d days" % (search_term, days_back)
    results = ThreatDescriptor.objects(
        fields=search_params[te.FIELDS],
        limit=search_params[te.LIMIT],
        text=search_term, 
        since=search_params[te.SINCE], 
        until=search_params[te.UNTIL],
        strict_text=search_params[te.STRICT_TEXT]
    )
    tmp = pd.DataFrame([result.to_dict() for result in results])
    tmp['search_term'] = search_term
    print "\t... found %d descriptors" % tmp.size
    if data_frame is None:
        data_frame = tmp
    else:
        data_frame = data_frame.append(tmp)
    
print "\nFound %d descriptors in total." % data_frame.size

Do some data munging for easier analysis and then preview as a sanity check


In [ ]:
from time import mktime

# Extract a datetime and timestamp, for easier analysis
data_frame['ds'] = pd.to_datetime(data_frame.added_on.str[0:10], format='%Y-%m-%d')
data_frame['ts'] = pd.to_datetime(data_frame.added_on)

# Extract the owner data
owner = data_frame.pop('owner')
owner = owner.apply(pd.Series)
data_frame = pd.concat([data_frame, owner.email, owner.name], axis=1)

# Extract freeform 'tags' in the description
def extract_tags(text):
    return re.findall(r'\[([a-zA-Z0-9\:\-\_]+)\]', text)
data_frame['tags'] = data_frame.description.map(lambda x: [] if x is None else extract_tags(x))

data_frame.head(n=5)

Create a Dashboard to Get a High-level View

The raw data is great, but it would be much better if we could take a higher level view of the data. This dashboard will provide more insight into:

  • what data is available
  • who's sharing it
  • how is labeled
  • how much of it is likely to be directly applicable for alerting

In [ ]:
import math
import matplotlib.pyplot as plt
import seaborn as sns

from pytx.vocabulary import ThreatDescriptor as td

%matplotlib inline

# Setup subplots for our dashboard
fig, axes = plt.subplots(nrows=4, ncols=2, figsize=(16,32))
axes[0,0].set_color_cycle(sns.color_palette("coolwarm_r", 15))

# Plot by Type over time
type_over_time = data_frame.groupby(
    [pd.Grouper(freq='d', key='ds'), te.TYPE]
    ).count().unstack(te.TYPE)
type_over_time.added_on.plot(
    kind='line', 
    stacked=True, 
    title="Indicator Types Per Day (-" + str(days_back) + "d)",
    ax=axes[0,0]
)

# Plot by threat_type over time
tt_over_time = data_frame.groupby(
    [pd.Grouper(freq='w', key='ds'), 'threat_type']
    ).count().unstack('threat_type')
tt_over_time.added_on.plot(
    kind='bar', 
    stacked=True, 
    title="Threat Types Per Week (-" + str(days_back) + "d)",
    ax=axes[0,1]
)

# Plot the top 10 tags
tags = pd.DataFrame([item for sublist in data_frame.tags for item in sublist])
tags[0].value_counts().head(10).plot(
    kind='bar', 
    stacked=True,
    title="Top 10 Tags (-" + str(days_back) + "d)",
    ax=axes[1,0]
)

# Plot by who is sharing
owner_over_time = data_frame.groupby(
    [pd.Grouper(freq='w', key='ds'), 'name']
    ).count().unstack('name')
owner_over_time.added_on.plot(
    kind='bar', 
    stacked=True, 
    title="Who's Sharing Each Week? (-" + str(days_back) + "d)",
    ax=axes[1,1]
)

# Plot the data as a timeseries of when it was published
data_over_time = data_frame.groupby(pd.Grouper(freq='6H', key='ts')).count()
data_over_time.added_on.plot(
    kind='line',
    title="Data shared over time (-" + str(days_back) + "d)",
    ax=axes[2,0]
)

# Plot by status label
data_frame.status.value_counts().plot(
    kind='pie', 
    title="Threat Statuses (-" + str(days_back) + "d)",
    ax=axes[2,1]
)

# Heatmap by type / source
owner_and_type = pd.DataFrame(data_frame[['name', 'type']])
owner_and_type['n'] = 1
grouped = owner_and_type.groupby(['name', 'type']).count().unstack('type').fillna(0)
ax = sns.heatmap(
    data=grouped['n'], 
    robust=True,
    cmap="YlGnBu",
    ax=axes[3,0]
)

# These require a little data munging
# translate a severity enum to a value
# TODO Add this translation to Pytx
def severity_value(severity):
    if severity == 'UNKNOWN': return 0
    elif severity == 'INFO': return 1
    elif severity == 'WARNING': return 3
    elif severity == 'SUSPICIOUS': return 5
    elif severity == 'SEVERE': return 7
    elif severity == 'APOCALYPSE': return 10
    return 0
# translate a severity 
def value_severity(severity):
    if severity >= 9: return 'APOCALYPSE'
    elif severity >= 6: return 'SEVERE'
    elif severity >= 4: return 'SUSPICIOUS'
    elif severity >= 2: return 'WARNING'
    elif severity >= 1: return 'INFO'
    elif severity >= 0: return 'UNKNOWN'

# Plot by how actionable the data is  
# Build a special dataframe and chart it
data_frame['severity_value'] = data_frame.severity.apply(severity_value)
df2 = pd.DataFrame({'count' : data_frame.groupby(['name', 'confidence', 'severity_value']).size()}).reset_index()
ax = df2.plot(
    kind='scatter', 
    x='severity_value', y='confidence', 
    xlim=(-1,11), ylim=(-10,110), 
    title='Data by Conf / Sev With Threshold Line',
    ax=axes[3,1],
    s=df2['count'].apply(lambda x: 1000 * math.log10(x)),
    use_index=td.SEVERITY
)
# Draw a threshhold for data we consider likely using for alerts (aka 'high value')
ax.plot([2,10], [100,0], c='red')

Dive A Little Deeper

Take a subset of the data and understand it a little more.

In this example, we presume that we'd like to take phishing related data and study it, to see if we can use it to better defend a corporate network or abuse in a product.

As a simple example, we'll filter down to data labeled MALICIOUS and the word phish in the description, to see if we can make a more detailed conclusion on how to apply the data to our existing internal workflows.


In [ ]:
from pytx.vocabulary import Status as s


phish_data = data_frame[(data_frame.status == s.MALICIOUS) 
                        & data_frame.description.apply(lambda x: x.find('phish') if x != None else False)]
# TODO: also filter for attack_type == PHISHING, when Pytx supports it

%matplotlib inline

# Setup subplots for our deeper dive plots
fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(16,8))

# Heatmap of type / source
owner_and_type = pd.DataFrame(phish_data[['name', 'type']])
owner_and_type['n'] = 1
grouped = owner_and_type.groupby(['name', 'type']).count().unstack('type').fillna(0)
ax = sns.heatmap(
    data=grouped['n'], 
    robust=True,
    cmap="YlGnBu",
    ax=axes[0]
)

# Tag breakdown of the top 10 tags
tags = pd.DataFrame([item for sublist in phish_data.tags for item in sublist])
tags[0].value_counts().head(10).plot(
    kind='pie',
    title="Top 10 Tags (-" + str(days_back) + "d)",
    ax=axes[1]
)

Extract The High Confidence / Severity Data For Use

With a better understanding of the data, let's filter the MALICIOUS, REVIEWED_MANUALLY labeled data down to a pre-determined threshold for confidence + severity.

You can add more filters, or change the threshold, as you see fit.


In [ ]:
from pytx.vocabulary import ReviewStatus as rs

# define our threshold line, which is the same as the red, threshold line in the chart above
sev_min = 2
sev_max = 10
conf_min= 0
conf_max = 100

# build a new series, to indicate if a row passes our confidence + severity threshold
def is_high_value(conf, sev):
    return (((sev_max - sev_min) * (conf - conf_max)) - ((conf_min - conf_max) * (sev - sev_min))) > 0
data_frame['is_high_value']= data_frame.apply(lambda x: is_high_value(x.confidence, x.severity_value), axis=1)

# filter down to just the data passing our criteria, you can add more here to filter by type, source, etc.
high_value_data = data_frame[data_frame.is_high_value 
                            & (data_frame.status == s.MALICIOUS)
                            & (data_frame.review_status == rs.REVIEWED_MANUALLY)].reset_index(drop=True)

# get a count of how much we kept
print "Kept %d of %d data as high value" % (high_value_data.size, data_frame.size)

# ... and preview it
high_value_data.head()

Now, output all of the high value data to a file as CSV or JSON, for consumption in our other systems and workflows.


In [ ]:
use_csv = False

if use_csv:
    file_name = 'threat_exchange_high_value.csv'
    high_value_data.to_csv(path_or_buf=file_name)
    print "CSV data written to %s" % file_name
else:
    file_name = 'threat_exchange_high_value.json'
    high_value_data.to_json(path_or_buf=file_name, orient='index')
    print "JSON data written to %s" % file_name

In [ ]: