Yotam Shmargad
University of Arizona
Email: yotam@email.arizona.edu
Web: www.yotamshmargad.com
In [ ]:
# Install tweepy
# !pip install tweepy
In [ ]:
# Import the libraries we need
import tweepy
import json
import time
import networkx
import os
import matplotlib.pyplot as plt
from collections import Counter
In [ ]:
# Authenticate!
auth = tweepy.OAuthHandler("Consumer Key", "Consumer Secret")
auth.set_access_token("Access Token", "Access Token Secret")
api = tweepy.API(auth)
In [ ]:
# Check working directory
os.getcwd()
In [ ]:
# Set working directory
os.chdir('FOLDER FOR SAVING FILES')
In [ ]:
# Check working directory
os.getcwd()
In [ ]:
# Keep track of API calls
# User timeline
callsUT = 0
# Retweeters
callsRT = 0
In [ ]:
# Number of tweets to be pulled
# Ego
E = 10
# Alter
A = 10
In [ ]:
# Existing user with tweets
ego = api.user_timeline(screen_name = "CUBoulder", count = E, include_rts = False, exclude_replies = True)
callsUT += 1
In [ ]:
len(ego)
In [ ]:
# Existing user with no tweets
ego = api.user_timeline(screen_name = "DeveloperYotam", count = E, include_rts = False, exclude_replies = True)
callsUT += 1
In [ ]:
len(ego)
In [ ]:
# Non-existing user
ego = api.user_timeline(screen_name = "fakeuserq4587937045", count = E, include_rts = False, exclude_replies = True)
callsUT += 1
In [ ]:
# Handling errors
ego = []
egosn = "CUBoulder"
try:
ego_raw = api.user_timeline(screen_name = egosn, count = E, include_rts = False, exclude_replies = True)
except tweepy.TweepError:
print("fail!")
callsUT += 1
In [ ]:
# Converting results to a list of json objects
ego = [egotweet._json for egotweet in ego_raw]
In [ ]:
# Writing ego tweets to a json file
with open('egotweet.json', 'w') as file:
json.dump(ego, file)
In [ ]:
callsUT
In [ ]:
# Looking at a json object
ego[0]
In [ ]:
# Accessing an element of ego tweets
ego[0]["id_str"]
In [ ]:
# Storing one of ego's tweet id
egoid = ego[0]["id_str"]
In [ ]:
# Storing and printing ego tweet ids and retweet counts
tweetids = []
retweets = []
if len(ego) != 0:
for egotweet in ego:
tweetids.append(egotweet["id_str"])
retweets.append(egotweet["retweet_count"])
print(egotweet["id_str"],egotweet["retweet_count"])
In [ ]:
# Collecting Retweets
egort = api.retweets(ego[0]["id_str"])
callsRT += 1
In [ ]:
len(egort)
In [ ]:
callsRT
In [ ]:
# Non-existing tweet
egort = api.retweets("garblegarble")
callsRT += 1
In [ ]:
# Note: callsRT did not increase in the last command
callsRT
In [ ]:
callsRT += 1
In [ ]:
# Sleep for 10 seconds
time.sleep(10)
In [ ]:
# Collecting retweeters of ego tweets
allretweeters = []
self = []
check = []
for egotweet in ego:
retweeters = []
try:
selftweet = 0
if callsRT >= 75:
time.sleep(900)
egort_raw = api.retweets(egotweet["id_str"])
egort = [egoretweet._json for egoretweet in egort_raw]
for retweet in egort:
if retweet["user"]["id_str"]!=egoid:
allretweeters.append((egoid,retweet["user"]["id_str"]))
retweeters.append(retweet["user"]["id_str"])
else:
selftweet = 1
check.append(len(retweeters))
self.append(selftweet)
except tweepy.TweepError:
check.append(0)
self.append(0)
callsRT += 1
In [ ]:
# Writing results to files
with open('check.json', 'w') as file:
json.dump(check, file)
with open('self.json', 'w') as file:
json.dump(self, file)
with open('allretweeters.json', 'w') as file:
json.dump(allretweeters, file)
In [ ]:
# Printing tweet ids, retweet counts,
# retweeters obtained, and whether a self tweet is included
for a, b, c, d in zip(tweetids,retweets,check,self):
print(a, b, c, d)
In [ ]:
len(allretweeters)
In [ ]:
allretweeters
In [ ]:
# Assigning edge weight to be number of tweets retweeted
weight = Counter()
for (i, j) in allretweeters:
weight[(i, j)] +=1
In [ ]:
weight
In [ ]:
# Defining weighted edges
weighted_edges = list(weight.items())
In [ ]:
weighted_edges
In [ ]:
# Defining the network object
G = networkx.Graph()
G.add_edges_from([x[0] for x in weighted_edges])
In [ ]:
# Visualizing the network
networkx.draw(G, width=[x[1] for x in weighted_edges])
In [ ]:
# Defining the set of unique retweeters
unique = [x[0][1] for x in weighted_edges]
In [ ]:
len(unique)
In [ ]:
unique
In [ ]:
callsUT
In [ ]:
# Collecting and storing the tweets of retweeters
alter = []
alters = []
for retweeter in unique:
try:
if callsUT >= 900:
time.sleep(900)
alter_raw = api.user_timeline(retweeter, count = A, include_rts = False, exclude_replies = True)
alter = [altertweet._json for altertweet in alter_raw]
alters.append(alter)
except tweepy.TweepError:
print("fail!")
callsUT += 1
In [ ]:
with open('alters.json', 'w') as file:
json.dump(alters, file)
In [ ]:
callsUT
In [ ]:
len(alters)
In [ ]:
# Printing the number of tweets pulled for each retweeter
for alt in alters:
print(len(alt))
In [ ]:
# Storing and printing alter ids, tweet ids, and retweet counts
altids = []
alttweetids = []
altretweets = []
for alt in alters:
for alttweet in alt:
altids.append(alttweet["user"]["id_str"])
alttweetids.append(alttweet["id_str"])
altretweets.append(alttweet["retweet_count"])
print(alttweet["user"]["id_str"],alttweet["id_str"],alttweet["retweet_count"])
In [ ]:
# Collecting retweeters of alter tweets
allalt = []
altself = []
altcheck = []
for alt in alters:
for alttweet in alt:
altid = alttweet["user"]["id_str"]
altretweeters = []
try:
selftweet = 0
if callsRT >= 75:
time.sleep(900)
altrt_raw = api.retweets(alttweet["id_str"])
altrt = [altretweet._json for altretweet in altrt_raw]
for retweet in altrt:
if retweet["user"]["id_str"]!=altid:
allalt.append((altid,retweet["user"]["id_str"]))
altretweeters.append(retweet["user"]["id_str"])
else:
selftweet = 1
altcheck.append(len(altretweeters))
altself.append(selftweet)
except tweepy.TweepError:
altcheck.append(0)
altself.append(0)
callsRT += 1
In [ ]:
# Writing results to files
with open('altcheck.json', 'w') as file:
json.dump(altcheck, file)
with open('altself.json', 'w') as file:
json.dump(altself, file)
with open('altretweeters.json', 'w') as file:
json.dump(altretweeters, file)
with open('allalt.json', 'w') as file:
json.dump(allalt, file)
In [ ]:
# Printing alter user ids, tweet ids, retweet counts,
# retweeters obtained, and whether a self tweet is included
for a, b, c, d, e in zip(altids,alttweetids,altretweets,altcheck,altself):
print(a, b, c, d, e)
In [ ]:
len(allalt)
In [ ]:
allalt
In [ ]:
weight = Counter()
for (i, j) in allalt:
weight[(i, j)] +=1
In [ ]:
weight
In [ ]:
all_edges = weighted_edges + list(weight.items())
In [ ]:
all_edges
In [ ]:
# Defining the full network object
G = networkx.Graph()
G.add_edges_from([x[0] for x in all_edges])
In [ ]:
# Visualizing the full network
networkx.draw(G, width=[x[1] for x in all_edges])