Installation

run (in Terminal)

pip3 install tweepy

or clone project from Github and follow installation instructions there: https://github.com/tweepy/tweepy

Connecting to the Twitter API

get your keys: https://apps.twitter.com/


In [ ]:
ckey = ''
csecret = ''
atoken = ''
asecret = ''

In [ ]:
import tweepy

# create "keychain"
auth = tweepy.OAuthHandler(ckey, csecret)
auth.set_access_token(atoken, asecret)

# create the API "object"

api = tweepy.API(auth, wait_on_rate_limit = True, wait_on_rate_limit_notify = True)

In [ ]:
# testing connection by printing own timeline

public_tweets = api.home_timeline(count = 5)

for tweet in public_tweets:
        print(tweet.text)

In [ ]:
# print Axel's last 5 tweets

axels_timeline = api.user_timeline(id = 'snurb_dot_info', count = 5)

for x in axels_timeline: 
    print(x.text)

Example: Track a keyword

(Unfortunately this needs a manual fix of the module, but should be updated soon. See here: https://github.com/tweepy/tweepy/issues/615)


In [ ]:
import tweepy

#create second 'app'
ckey2 = ''
csecret2 = ''
atoken2 = ''
asecret2 = ''

# create "keychain"
auth2 = tweepy.OAuthHandler(ckey2, csecret2)
auth2.set_access_token(atoken2, asecret2)

# create the API "object"

api2 = tweepy.API(auth2, wait_on_rate_limit = True, wait_on_rate_limit_notify = True)

In [ ]:
class MyStreamListener(tweepy.StreamListener):
# define the tasks of your certain kind of listener
    
    def on_status(self, status):
        print('@' + status.user.screen_name + ': ' + status.text)
        
    def on_error(self, status_code):
        print('error code = %s' % status_code)
        return False


myListener = MyStreamListener() 
# create a listener of that kind

myStream = tweepy.Stream(auth = api2.auth, listener=myListener) 
# use the listener for creating a stream
    
try:
    myStream.filter(track=['auspol']) 
    # access the stream
except KeyboardInterrupt:
    myStream.disconnect()
    print('\nGoodbye!')

Example: Gather following network of a list of users


In [ ]:
import time

user = 'flxvctr' # the screenname of the user you want the friends of

def get_friend_ids(username):

    uids = tweepy.Cursor(api.friends_ids, screen_name=username).items()

    idlist = []

    for uid in uids:
        idlist.append(uid)
    
    return idlist

def get_screen_name_and_created_at(uids):
    
    friend_count = len(uids)
    
    i = 0
    
    results = []
    
    while i < friend_count:
        j = i + 100
        users = api.lookup_users(user_ids=uids[i:j])
        for user in users:
            results.append([uids[i],user.screen_name,int(time.mktime(user.created_at.timetuple()))])
            i += 1
                          
    return results

friendlist = get_screen_name_and_created_at(get_friend_ids(user))

print(len(friendlist),friendlist)

And here's a script for CSVs (with errorhandling etc.)


In [ ]:
# Usage: 
# insert consumer and app keys below
# run (eg in Terminal): python3 scriptname.py inputfile.csv outputfile.csv
# inputfile is a list of userids

import time
import csv
import io
import tweepy
from os import stat
from sys import argv

#ckey = ''
#csecret = ''
#atoken = ''
#asecret = ''
#uncomment if necessary

auth = tweepy.OAuthHandler(ckey, csecret)
auth.set_access_token(atoken, asecret)

api = tweepy.API(auth, wait_on_rate_limit = True, wait_on_rate_limit_notify = True)

# userlistpath = argv[1]
# friendlistpath = argv[2]
# uncomment two lines above for script use

# change the following two lines
userlistpath = '' 
friendlistpath = ''

def get_friend_ids(userid):

    userid = int(userid)

    uids = tweepy.Cursor(api.friends_ids, user_id = userid).items()

    idlist = []
    idcount = 0
    
    try:
        for uid in uids:
            idlist.append(uid)
            idcount += 1
    except tweepy.TweepError as e:
        idlist.append(-int(e.reason[-3:]))
        print(e.reason + ' for %d'  %(userid))
    
    print('%d friend ids collected for %d.' %(idcount,userid))
    
    return idlist


def get_screen_name_and_created_at(uids):
    
    friend_count = len(uids)
    
    i = 0
    
    results = []
    
    while i < friend_count:
        
        if uids[0] < 0:
            results.append([uids[i]]*3)
            break
        else:
            j = i + 100
            if j > friend_count:
                j = friend_count
            try:
                users = api.lookup_users(user_ids=uids[i:j])
                for user in users:
                    results.append([uids[i],user.screen_name,int(time.mktime(user.created_at.timetuple()))])
                    i += 1
            except tweepy.TweepError as e:
                code = int(e.reason[-3:])
                for uid in uids[i:j]:
                    results.append([uid, -code, -code])
                    i += 1        
    
    print('%d friend details collected.' %(i))
                          
    return results


# opens csv, calls the funtions above with the csv's contents, writes results in other csv
def write_edge_list(readpath, writepath):
    with io.open(readpath,
                 encoding='utf_8', newline='') as userlist, io.open(writepath,
                                                                    mode='a',encoding='utf_8', newline='') as friendlist:
        users = csv.reader(userlist)
        friends = csv.writer(friendlist, delimiter = '\t', lineterminator = '\n')
        
        if stat(writepath).st_size == 0:
            friends.writerow(['user','friend_id','friend_name','friend_created'])
        
        n = 0
        
        for user in users:
            print('Friends of %d users gathered so far. Goin on.' %(n))
            friendids = get_friend_ids(user[0])
            frienddetails = get_screen_name_and_created_at(friendids)
            for details in frienddetails:
                friends.writerow([user[0],details[0],details[1],details[2]])
            n += 1
        
    print('Friends of %d users gathered. Pew. That was exciting.' %(n))



write_edge_list(userlistpath,friendlistpath)