We are interested in observing the discussions on Twitter to identify vulnerabilites and exposures. We plan to focus on collecting tweets consisting of particular words of interest. The data collected can then be cleaned and used for analysis on similar lines as the full disclosure mailing list.
The notebook is inspired by the work in the article Vulnerability Disclosure in the Age of Social Media: Exploiting Twitter for Predicting Real-World Exploits where the idea is to use twitter analytics for early detecting exploits. There have been instances shared in the presentation, where vulnerabilities have been mentioned and discussed on twitter before the vulnerability is disclosed and this is the motivation for using twitter as a part of the research.
The researchers in the article started collecting tweets based on a list of 50 words. The list of words have not been mentioned in the article, hence we start our analysis by collecting tweets for keywords identified manually by doing our research on discussions related to vulnerabilities on twitter.
The following task can be achieved in two ways:
In [11]:
myvars = {}
with open("Twitter_keys.txt") as myfile:
for line in myfile:
name, var = line.partition("=")[::2]
myvars[name.strip()] = var
In [12]:
APP_KEY = myvars["APP_KEY"].rstrip()
APP_SECRET = myvars["APP_SECRET"].rstrip()
OAUTH_TOKEN = myvars["OAUTH_TOKEN"].rstrip()
OAUTH_TOKEN_SECRET = myvars["OAUTH_TOKEN_SECRET"].rstrip()
A variable save_path is created which contains the path to the folder where the tweet files in json format will be stored. The folder name is saved in the format "tweet_MM_DD_YYYY".
In [13]:
import os
now=datetime.datetime.now()
day=int(now.day)
month=now.month
year=int(now.year)
current_dir=os.getcwd()
save_path=os.path.join(current_dir, r'tweet_%i_%i_%i' % (now.month, now.day, now.year))
if not os.path.exists(save_path):
os.makedirs(save)
We use the twython library for the tweet extraction. Twython is an actively maintained, pure Python wrapper for the Twitter API. It supports both normal and streaming Twitter APIs.
The primary task is to obtain the keys and tokens required to access the API and then access the functions in the wrappers.
The scripts below have two configurable parameters:
If query_word is initialized to multiple words, the code will retrieve set of tweets that consists of all the words.
The streaming API is used to access all the current tweets. It returns approximately 1% of the tweets i.e. 60 tweets per second assuming a maximum of 6000 users tweet every second. There is no rate limit to the Streaming API.
The hyperlink has details related to the Twitter Streaming API limit:
In [14]:
#import libraries
from twython import TwythonStreamer
from twython import Twython, TwythonError
import time
import sys
import os
#Configurable parameters
query_word="access-role"
max_tweets=5
searched_tweets_strm=[]
class MyStreamer(TwythonStreamer):
def on_success(self, data):
if len(searched_tweets_strm)<max_tweets:
if 'text' in data:
#print data['text'].encode('utf-8')
searched_tweets_strm.append(data['text'].encode('utf-8'))
else:
print ("No tweets found")
self.disconnect()
else:
print ("Max tweets extracted")
sys.exit()
def on_error(self, status_code, data):
print (status_code, data)
print ("Exception raised, waiting 15 minutes")
time.sleep(15*60)
# Requires Authentication as of Twitter API v1.1
stream = MyStreamer(APP_KEY, APP_SECRET,
OAUTH_TOKEN, OAUTH_TOKEN_SECRET)
stream.statuses.filter(track=query_word)
In [ ]:
print (searched_tweets_strm)
2.2. Search API
The streaming API is used to collect tweets from the current feed and not the historical data. For historical data, the search API can be used to access tweets that are up to three weeks old. Once these historical tweets are collected, the rest of the tweets can be accessed by running the streaming API continuously.
Limitations: To collect tweets for two different words, the Twitter API needs to be queried twice and there is no functionality of collecting it in one function call at the same time.
Example: If we need to collect tweets with the word "data" in it and we also need to collect tweets with the word with the word "flaw" in it. The search API needs to be called twice, once for the word "data" and then for the word "flaw".
Note: The search API limit is 180 requests every 15 minutes and hence the code will sleep for 15 minutes every time the API limit is reached.
In [ ]:
#import libraries
from twython import Twython, TwythonError
import time
from time import gmtime, strftime
import json
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt
import datetime
import os
#Configurable parameters
query_word="CVE"
max_tweets=800
tweet_cnt=0
# Requires Authentication as of Twitter API v1.1
twitter = Twython(APP_KEY, APP_SECRET, OAUTH_TOKEN, OAUTH_TOKEN_SECRET)
searched_tweets_srch = []
while len(searched_tweets_srch) < max_tweets:
remaining_tweets = max_tweets - len(searched_tweets_srch)
try:
search_results = twitter.search(q=query_word, count=100)
if not search_results:
print('no tweets found')
break
tweet_cnt=tweet_cnt+len(search_results["statuses"])
searched_tweets_srch.extend(search_results["statuses"])
except TwythonError as e:
print (e)
print ("exception raised, waiting 16 minutes")
print (strftime("%H:%M:%S"+ gmtime()))
time.sleep(16*60)
print ("Total tweets extracted for "+query_word+": "+str(tweet_cnt))
We will define a function save_tweet_json which takes the save_path as an argument and stores each tweet as a .json file. As such, for example, if you download 200 tweets, it accordingly have 200 files in the folder. Each file is saved in the form
In [ ]:
def save_tweets_json(save_path):
data=pd.DataFrame(data=[tweets['id'] for tweets in searched_tweets_srch],columns=['user_id'])
i=0
for tweets in searched_tweets_srch:
if(tweets['id']==data.iloc[i]['user_id']):
file=open(save_path+"/"+tweets['user']['screen_name']+'_'+ str(data.iloc[i]['user_id'])+".json", "w")
file.write(json.dumps(tweets,indent=4))
file.write("\n")
file.close()
i=i+1
save_tweets_json(save_path)
In [10]:
print (searched_tweets_srch)
References:
Vulnerability Disclosure in the Age of Social Media: Exploiting Twitter for Predicting Real-World Exploits, https://www.umiacs.umd.edu/~tdumitra/papers/USENIX-SECURITY-2015.pdf
http://www.umiacs.umd.edu/~tdumitra/blog/2015/08/02/predicting-vulnerability-exploits/