Here we provide code used to collect Twitter data for our AAAI paper.
Note that to this is a long running process (days) and may result in data that differs from that in the original paper. So, if you're interested in reproducing the results, I'd instead start with the data_processing.ipynb notebook.
For each brand in brands.json, we collect 300 followers. For each of these followers, we collect up to 5,000 of their friends (i.e., users they follow).
The results are stored in three pickle files:
username2brand.pkl
: a dict from Twitter handle to brand demographics.id2brand.pkl
: a dict from Twitter user id to brand.brand2counts.pkl
: a dict from brand Twitter id to a Counter object. The Counter object is a dict from Twitter ID to count, representing the number times a follower of brand X is friends with user Y. E.g., {123: {456: 10, 789: 5}}
means that of the users who follow 123, 10 also follow 456 and 5 also follow 789.MongoDB is used to store data for intermediate computations. You may want to distribute calls to process_followers
and process_friends
for efficiency.
In [1]:
from pymongo import MongoClient
dbconn = MongoClient('localhost', 27017)
db = dbconn.twitter_demographics
print db
In [5]:
import json
#insert twitter credentials into DB
twitter_cred = db['twitter_cred']
#remove all before insert
twitter_cred.remove({})
with open('../data/twitter-cred.json','r') as f:
for line in f.readlines():
user = json.loads(line)
user['is_taken'] = False
twitter_cred.insert(user)
print 'Inserted %d users credentails'%(twitter_cred.count())
In [7]:
#insert brand details in to db
brands = db['brands']
brands.remove({})
with open('../data/brands.json','r') as f:
for line in f.readlines():
brands.insert(eval(line))
print 'Inserted %d brands'%(brands.count())
In [8]:
#helper functions
def get_credentials():
"""Gets twitter credentials from DB
Returns:
Respone dict , or None if failed.
"""
twitter_cred = db['twitter_cred']
return twitter_cred.find_and_modify(query={'is_taken':False}, update={'$set':{'is_taken':True}}, upsert=False, sort=None)
def get_brand_to_process():
"""Gets unprocessed brand names from DB
Returns:
brand dict or None
"""
brandObj = db['brands']
brand = brandObj.find_and_modify(query={'is_processed':False,'is_taken':False}, update={'$set':{'is_taken':True}}, upsert=False, sort=None, full_response=False)
if brand:
brand['brand_id'] = brand.pop('_id')
return brand
def get_follower_to_process():
"""Gets unprocessed followers form DB
Returns:
follower details dict or None
"""
xmatrix = db['xmatrix']
followersDtls = xmatrix.find_and_modify(query={'is_processed':False,'is_taken':False}, update={'$set':{'is_taken':True}}, upsert=False, sort=None, full_response=False)
if followersDtls:
followersDtls['follower_id'] = followersDtls.pop('_id')
return followersDtls
def add_followers_to_db(brand_id,followers_ids):
"""Adds followers to DB
Args:
brand_id :brand id of which the user follows
followers_ids: Ids of followers of the brand
"""
xmatrix = db['xmatrix']
for follower_id in followers_ids:
xmatrix.update({'_id':follower_id},{'$addToSet':{'follows':brand_id},'$set':{'is_processed':False,'is_taken':False}},True)
def add_friends_to_dB(follower_id,friends_list):
"""Adds friends to DB
Args:
follower_id: Id of follower
freiends : list of his friends
"""
xmatrix = db['xmatrix']
xmatrix.update({'_id':follower_id},
{'$addToSet':{
'follows':{
'$each':friends_list
}
}
})
def update_processed_flag(user_id):
"""updates processed flag to true
Args:
user_id: user id for which processed flag to be updated
"""
xmatrix = db['xmatrix']
xmatrix.update({'_id':user_id},{'$set':{'is_taken':False,'is_processed':True}})
def remove_user_from_x(self,user_id):
"""removes user from xmatrix
Args:
user_id: user id
"""
xmatrix = db['xmatrix']
xmatrix.remove({'_id':user_id})
def robust_request(twitter, resource, params, max_tries=5):
""" If a Twitter request fails, sleep for 15 minutes.
Do this at most max_tries times before quitting.
Args:
twitter .... A TwitterAPI object.
resource ... A resource string to request.
params ..... A parameter dictionary for the request.
max_tries .. The maximum number of tries to attempt.
Returns:
A TwitterResponse object, or None if failed.
"""
for i in range(max_tries):
request = twitter.request(resource, params)
if request.status_code == 200:
return request
r = [r for r in request][0]
if ('code' in r and r['code'] == 34) or ('error' in r and r['error'] == 'Not authorized.'): # 34 == user does not exist.
print >> sys.stderr, 'skipping bad request', resource, params
return None
else:
print >> sys.stderr, 'Got error:', request.text, '\nsleeping for 15 minutes.'
sys.stderr.flush()
time.sleep(60 * 15)
In [9]:
#get twitter obj
from TwitterAPI import TwitterAPI
twitter = get_credentials()
if twitter:
twitterObj = TwitterAPI(
twitter['api_key'],
twitter['api_secret'],
twitter ['access_token_key'],
twitter['access_token_secret'])
else:
print >> sys.stderr,'Twitter credits not available'
In [10]:
def get_followers(user_id):
"""To get followers of given twitter Ids
Args:
user_id... twitter user id
Returns
followers list
"""
followers = []
request = robust_request(twitterObj, 'followers/ids',
{'user_id': user_id, 'count': 300})
if request:
for result in request:
if 'ids' in result:
followers += result['ids']
return followers
def get_friends(user_id):
"""To get friends of given twitter Ids
Args:
user_id... twitter user id
Returns
friends list
"""
friends = []
request = robust_request(twitterObj, 'friends/ids',
{'user_id': user_id, 'count': 5000})
if request:
for result in request:
if 'ids' in result:
friends += result['ids']
return friends
In [13]:
#For testing purpose
#else set it to -1
cut_off = 3
In [12]:
import sys
def process_followers():
"""Gets unprocessed brands from DB
fetch 300 followers of those brands and
adds it to DB.
Halts when all brands are processed or cutt_off is reached
"""
global cut_off
while True:
if cut_off == 0:
print >> sys.stderr, 'cut off reached'
break
cut_off -= 1
brand = get_brand_to_process()
if not brand:
print >> sys.stderr, 'No brands to process'
break
followers = get_followers(brand['brand_id'])
if len(followers) > 0:
add_followers_to_db(brand['brand_id'],followers)
print 'added %d followers of %s to DB'%(len(followers),brand['brand_id'])
process_followers()
In [ ]:
#For testing purpose
#else set it to -1
cut_off = 3
In [14]:
import sys
def process_friends():
"""Gets unprocessed follower and get
5000 of their friends , adds it to DB
Halts when all followers are processed or cutt_off is reached
"""
global cut_off
while True:
if cut_off == 0:
print >> sys.stderr, 'cut off reached'
break
cut_off -= 1
follower = get_follower_to_process()
if not follower:
print >> sys.stderr, 'No follower to process'
break
friends = get_friends(str(follower['follower_id']))
if len(friends) > 0:
add_friends_to_dB(follower['follower_id'],friends)
print 'added %d followers of %s to DB'%(len(friends),follower['follower_id'])
update_processed_flag(str(follower['follower_id']))
else:
remove_user_from_x(str(follower['follower_id']))
print >> sys.stderr, 'removed user , unable to fetch friends list for %s'%(str(follower['follower_id']))
process_friends()
In [2]:
# Get all brands.
id2brand = dict()
username2brand = dict()
for brand in db.brands.find():
id2brand[brand['_id']] = brand
username2brand[brand['brand_name'].lower()] = brand
print 'read', len(id2brand), 'brands'
In [3]:
# Iterate over sampled followers for each brand.
from collections import Counter, defaultdict
# Count how often each friend appears so we can remove those occuring fewer than N tims.
friend_counts = Counter()
count = 0
for follower in db.xmatrix.find({'is_processed':True}):
friend_counts.update(follower['follows'])
count += 1
if count % 1000 == 0:
print 'read', count
In [4]:
# Filter accounts not appearing a minimum number of times.
count_thresh = 100
friend_set = set(f for f, v in friend_counts.iteritems() if v >= count_thresh)
print len(friend_set), 'of', len(friend_counts), 'appear at least', count_thresh, 'times'
In [5]:
# Now construct friend counts for each brand, using the filtered set of accounts.
brand2counts = defaultdict(lambda: Counter())
count = 0
for follower in db.xmatrix.find({'is_processed':True}):
count += 1
if count % 1000 == 0:
print 'read', count
brandids = set([f for f in follower['follows'] if f in id2brand])
friends = set(follower['follows']) & friend_set
for b in brandids:
brand2counts[b].update(friends)
In [6]:
# Print the top accounts for the first brand.
print brand2counts.keys()[0], sorted(brand2counts.values()[0].items(), key=lambda x: -x[1])[:5]
In [7]:
# Read the demographics data for each brand.
import json
username2demo = dict()
for line in open('../data/demo.json', 'rt'):
js = json.loads(line)
username2demo[js['twitter'].lower()] = js
print 'read', len(username2demo), 'demographics'
In [8]:
# Add demographics to each brand dict.
for username, brand in username2brand.iteritems():
brand['demo'] = username2demo[username]
if not 'Female' in brand['demo']:
print brand['brand_name'] # , brand['demo']['Female']
In [9]:
# Set self reference counts to 0.
for brand in brand2counts:
brand2counts[brand][brand] = 0.
# Now, the brand id should not appear in the count dict.
print brand2counts.keys()[0], sorted(brand2counts.values()[0].items(), key=lambda x: -x[1])[:5]
In [10]:
# Pickle everything
from functools import partial
import pickle
pickle.dump(username2brand, open('username2brand.pkl', 'wb'))
pickle.dump(id2brand,open('id2brand.pkl','wb'))
pickle.dump(dict([(b,c) for b,c in brand2counts.iteritems()]), open('brand2counts.pkl', 'wb'))
In [ ]: