Purpose
The ThreatExchange APIs are designed to make consuming threat intelligence from multiple sources easy. This notebook will walk you through:
What you need
Before getting started, you'll need a few Python packages installed:
All of the python packages mentioned can be installed via
pip install <package_name>
access_tokenIf you don't already have an access_token for your app, use the Facebook Access Token Tool to get one.
In [ ]:
from pytx.access_token import access_token
from pytx.logger import setup_logger
from pytx.vocabulary import PrivacyType as pt
# Specify the location of your token via one of several ways:
# https://pytx.readthedocs.org/en/latest/pytx.access_token.html
access_token()
Optionally, enable debug level logging
In [ ]:
# Uncomment this if you want debug logging enabled
#setup_logger(log_file="pytx.log")
In [ ]:
# Our basic search parameters, we default to querying over the past 14 days
days_back = 14
search_terms = ['abuse', 'phishing', 'malware', 'exploit', 'apt', 'ddos', 'brute', 'scan', 'cve']
Next, we execute the query using our search parameters and put the results in a Pandas DataFrame
In [ ]:
from datetime import datetime, timedelta
from time import strftime
import pandas as pd
import re
from pytx import ThreatDescriptor
from pytx.vocabulary import ThreatExchange as te
# Define your search string and other params, see
# https://pytx.readthedocs.org/en/latest/pytx.common.html#pytx.common.Common.objects
# for the full list of options
search_params = {
te.FIELDS: ThreatDescriptor._default_fields,
te.LIMIT: 1000,
te.SINCE: strftime('%Y-%m-%d %H:%m:%S +0000', (datetime.utcnow() + timedelta(days=(-1*days_back))).timetuple()),
te.TEXT: search_terms,
te.UNTIL: strftime('%Y-%m-%d %H:%m:%S +0000', datetime.utcnow().timetuple()),
te.STRICT_TEXT: False
}
data_frame = None
for search_term in search_terms:
print "Searching for '%s' over -%d days" % (search_term, days_back)
results = ThreatDescriptor.objects(
fields=search_params[te.FIELDS],
limit=search_params[te.LIMIT],
text=search_term,
since=search_params[te.SINCE],
until=search_params[te.UNTIL],
strict_text=search_params[te.STRICT_TEXT]
)
tmp = pd.DataFrame([result.to_dict() for result in results])
tmp['search_term'] = search_term
print "\t... found %d descriptors" % tmp.size
if data_frame is None:
data_frame = tmp
else:
data_frame = data_frame.append(tmp)
print "\nFound %d descriptors in total." % data_frame.size
Do some data munging for easier analysis and then preview as a sanity check
In [ ]:
from time import mktime
# Extract a datetime and timestamp, for easier analysis
data_frame['ds'] = pd.to_datetime(data_frame.added_on.str[0:10], format='%Y-%m-%d')
data_frame['ts'] = pd.to_datetime(data_frame.added_on)
# Extract the owner data
owner = data_frame.pop('owner')
owner = owner.apply(pd.Series)
data_frame = pd.concat([data_frame, owner.email, owner.name], axis=1)
# Extract freeform 'tags' in the description
def extract_tags(text):
return re.findall(r'\[([a-zA-Z0-9\:\-\_]+)\]', text)
data_frame['tags'] = data_frame.description.map(lambda x: [] if x is None else extract_tags(x))
data_frame.head(n=5)
The raw data is great, but it would be much better if we could take a higher level view of the data. This dashboard will provide more insight into:
In [ ]:
import math
import matplotlib.pyplot as plt
import seaborn as sns
from pytx.vocabulary import ThreatDescriptor as td
%matplotlib inline
# Setup subplots for our dashboard
fig, axes = plt.subplots(nrows=4, ncols=2, figsize=(16,32))
axes[0,0].set_color_cycle(sns.color_palette("coolwarm_r", 15))
# Plot by Type over time
type_over_time = data_frame.groupby(
[pd.Grouper(freq='d', key='ds'), te.TYPE]
).count().unstack(te.TYPE)
type_over_time.added_on.plot(
kind='line',
stacked=True,
title="Indicator Types Per Day (-" + str(days_back) + "d)",
ax=axes[0,0]
)
# Plot by threat_type over time
tt_over_time = data_frame.groupby(
[pd.Grouper(freq='w', key='ds'), 'threat_type']
).count().unstack('threat_type')
tt_over_time.added_on.plot(
kind='bar',
stacked=True,
title="Threat Types Per Week (-" + str(days_back) + "d)",
ax=axes[0,1]
)
# Plot the top 10 tags
tags = pd.DataFrame([item for sublist in data_frame.tags for item in sublist])
tags[0].value_counts().head(10).plot(
kind='bar',
stacked=True,
title="Top 10 Tags (-" + str(days_back) + "d)",
ax=axes[1,0]
)
# Plot by who is sharing
owner_over_time = data_frame.groupby(
[pd.Grouper(freq='w', key='ds'), 'name']
).count().unstack('name')
owner_over_time.added_on.plot(
kind='bar',
stacked=True,
title="Who's Sharing Each Week? (-" + str(days_back) + "d)",
ax=axes[1,1]
)
# Plot the data as a timeseries of when it was published
data_over_time = data_frame.groupby(pd.Grouper(freq='6H', key='ts')).count()
data_over_time.added_on.plot(
kind='line',
title="Data shared over time (-" + str(days_back) + "d)",
ax=axes[2,0]
)
# Plot by status label
data_frame.status.value_counts().plot(
kind='pie',
title="Threat Statuses (-" + str(days_back) + "d)",
ax=axes[2,1]
)
# Heatmap by type / source
owner_and_type = pd.DataFrame(data_frame[['name', 'type']])
owner_and_type['n'] = 1
grouped = owner_and_type.groupby(['name', 'type']).count().unstack('type').fillna(0)
ax = sns.heatmap(
data=grouped['n'],
robust=True,
cmap="YlGnBu",
ax=axes[3,0]
)
# These require a little data munging
# translate a severity enum to a value
# TODO Add this translation to Pytx
def severity_value(severity):
if severity == 'UNKNOWN': return 0
elif severity == 'INFO': return 1
elif severity == 'WARNING': return 3
elif severity == 'SUSPICIOUS': return 5
elif severity == 'SEVERE': return 7
elif severity == 'APOCALYPSE': return 10
return 0
# translate a severity
def value_severity(severity):
if severity >= 9: return 'APOCALYPSE'
elif severity >= 6: return 'SEVERE'
elif severity >= 4: return 'SUSPICIOUS'
elif severity >= 2: return 'WARNING'
elif severity >= 1: return 'INFO'
elif severity >= 0: return 'UNKNOWN'
# Plot by how actionable the data is
# Build a special dataframe and chart it
data_frame['severity_value'] = data_frame.severity.apply(severity_value)
df2 = pd.DataFrame({'count' : data_frame.groupby(['name', 'confidence', 'severity_value']).size()}).reset_index()
ax = df2.plot(
kind='scatter',
x='severity_value', y='confidence',
xlim=(-1,11), ylim=(-10,110),
title='Data by Conf / Sev With Threshold Line',
ax=axes[3,1],
s=df2['count'].apply(lambda x: 1000 * math.log10(x)),
use_index=td.SEVERITY
)
# Draw a threshhold for data we consider likely using for alerts (aka 'high value')
ax.plot([2,10], [100,0], c='red')
Take a subset of the data and understand it a little more.
In this example, we presume that we'd like to take phishing related data and study it, to see if we can use it to better defend a corporate network or abuse in a product.
As a simple example, we'll filter down to data labeled MALICIOUS and the word phish in the description, to see if we can make a more detailed conclusion on how to apply the data to our existing internal workflows.
In [ ]:
from pytx.vocabulary import Status as s
phish_data = data_frame[(data_frame.status == s.MALICIOUS)
& data_frame.description.apply(lambda x: x.find('phish') if x != None else False)]
# TODO: also filter for attack_type == PHISHING, when Pytx supports it
%matplotlib inline
# Setup subplots for our deeper dive plots
fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(16,8))
# Heatmap of type / source
owner_and_type = pd.DataFrame(phish_data[['name', 'type']])
owner_and_type['n'] = 1
grouped = owner_and_type.groupby(['name', 'type']).count().unstack('type').fillna(0)
ax = sns.heatmap(
data=grouped['n'],
robust=True,
cmap="YlGnBu",
ax=axes[0]
)
# Tag breakdown of the top 10 tags
tags = pd.DataFrame([item for sublist in phish_data.tags for item in sublist])
tags[0].value_counts().head(10).plot(
kind='pie',
title="Top 10 Tags (-" + str(days_back) + "d)",
ax=axes[1]
)
In [ ]:
from pytx.vocabulary import ReviewStatus as rs
# define our threshold line, which is the same as the red, threshold line in the chart above
sev_min = 2
sev_max = 10
conf_min= 0
conf_max = 100
# build a new series, to indicate if a row passes our confidence + severity threshold
def is_high_value(conf, sev):
return (((sev_max - sev_min) * (conf - conf_max)) - ((conf_min - conf_max) * (sev - sev_min))) > 0
data_frame['is_high_value']= data_frame.apply(lambda x: is_high_value(x.confidence, x.severity_value), axis=1)
# filter down to just the data passing our criteria, you can add more here to filter by type, source, etc.
high_value_data = data_frame[data_frame.is_high_value
& (data_frame.status == s.MALICIOUS)
& (data_frame.review_status == rs.REVIEWED_MANUALLY)].reset_index(drop=True)
# get a count of how much we kept
print "Kept %d of %d data as high value" % (high_value_data.size, data_frame.size)
# ... and preview it
high_value_data.head()
Now, output all of the high value data to a file as CSV or JSON, for consumption in our other systems and workflows.
In [ ]:
use_csv = False
if use_csv:
file_name = 'threat_exchange_high_value.csv'
high_value_data.to_csv(path_or_buf=file_name)
print "CSV data written to %s" % file_name
else:
file_name = 'threat_exchange_high_value.json'
high_value_data.to_json(path_or_buf=file_name, orient='index')
print "JSON data written to %s" % file_name
In [ ]: