This notebook demonstrates using PySpark to analyze tweets stored in ElasticSearch.

Setting up the ElasticSearch and Spark cluster is described in https://github.com/justinlittman/TweetSets.

To run pyspark-notebook:

  1. Get a copy of the elasticsearch-hadoop jar (elasticsearch-hadoop-6.2.2.jar).
  2. Run (adjusting linked directories and ports as necessary): docker run -it --rm -p 8888:8888 --net=host --pid=host -e TINI_SUBREAPER=true -v ~/notebooks:/home/jovyan/work -v ~/elasticsearch-hadoop-6.2.2.jar:/home/jovyan/elasticsearch-hadoop-6.2.2.jar jupyter/pyspark-notebook

A few notes:

  • pyspark-notebook requires Python 3.6 and Spark 2.3. For the Spark cluster, gettyimages/spark was customized to be based on python:3.6-jessie (since by default, it uses Python 3.4.)
  • The networking for Spark is hugely confusing and relies heavily on random ports. This doesn't play well with Docker, but I think I got it right.

Create the Spark Context.


In [1]:
import os
import pyspark

# Add the elasticsearch-hadoop jar
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /home/jovyan/elasticsearch-hadoop-6.2.2.jar pyspark-shell'
conf = pyspark.SparkConf()

# Point to the master.
conf.setMaster("spark://tweetsets.library.gwu.edu:7101")
conf.setAppName("pyspark-elasticsearch-demo")
conf.set("spark.driver.bindAddress", "0.0.0.0")
# Don't hog all of the cores.
conf.set("spark.cores.max", "3")
# Specify a port for the block manager (which runs as part of the worker). The range 7003-7028 is set 
# to be open in the Spark worker container.
conf.set("spark.blockManager.port", "7004")

# create the context
sc = pyspark.SparkContext(conf=conf)

Using RDD

Create an RDD from the ElasticSearch index.


In [2]:
# Configure for ElasticSearch cluster and index.
es_conf = {"es.nodes": "tweetsets.library.gwu.edu",
           "es.port": "9200",
           "es.resource": "tweets-ba2157/doc"}

tweets_rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\
    "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=es_conf)

Retrieve the first element from the RDD.


In [3]:
tweets_rdd.first()


Out[3]:
('991588546514440193',
 {'hashtags': ('savethe8th',),
  'tweet_id': '991588546514440193',
  'retweeted_quoted_user_id': '780395768595288064',
  'created_at': '2018-05-02T08:01:46+00:00',
  'mention_user_ids': ('780395768595288064',),
  'retweet_count': 0,
  'mention_screen_names': ('SarahEiruFodla',),
  'user_follower_count': 450,
  'urls': (),
  'retweet_quoted_status_id': '991443480105930752',
  'user_language': 'en',
  'favorite_count': 0,
  'text': ('RT @SarahEiruFodla: Only when I see supporters of #savethe8th hand out free condoms/pills, deliver realistic sex education, offer quality l…',),
  'user_verified': False,
  'user_utc_offset': '3600',
  'retweeted_quoted_screen_name': 'SarahEiruFodla',
  'has_geo': False,
  'user_screen_name': 'GeorgiaTraceyy',
  'dataset_id': 'ba2157',
  'has_media': False,
  'tweet': '{"quote_count": 0, "contributors": null, "truncated": false, "text": "RT @SarahEiruFodla: Only when I see supporters of #savethe8th hand out free condoms/pills, deliver realistic sex education, offer quality l\\u2026", "is_quote_status": false, "in_reply_to_status_id": null, "reply_count": 0, "id": 991588546514440193, "favorite_count": 0, "entities": {"user_mentions": [{"id": 780395768595288064, "indices": [3, 18], "id_str": "780395768595288064", "screen_name": "SarahEiruFodla", "name": "Eir\\u00fa_F\\u00f3dla\\u2122"}], "symbols": [], "hashtags": [{"indices": [50, 61], "text": "savethe8th"}], "urls": []}, "retweeted": false, "coordinates": null, "timestamp_ms": "1525248106970", "source": "<a href=\\"http://twitter.com/download/android\\" rel=\\"nofollow\\">Twitter for Android</a>", "in_reply_to_screen_name": null, "id_str": "991588546514440193", "retweet_count": 0, "in_reply_to_user_id": null, "favorited": false, "retweeted_status": {"quote_count": 2, "contributors": null, "truncated": true, "text": "Only when I see supporters of #savethe8th hand out free condoms/pills, deliver realistic sex education, offer quali\\u2026 https://t.co/ttT3luYSRF", "is_quote_status": false, "in_reply_to_status_id": null, "reply_count": 4, "id": 991443480105930752, "favorite_count": 158, "entities": {"user_mentions": [], "symbols": [], "hashtags": [{"indices": [30, 41], "text": "savethe8th"}], "urls": [{"url": "https://t.co/ttT3luYSRF", "indices": [117, 140], "expanded_url": "https://twitter.com/i/web/status/991443480105930752", "display_url": "twitter.com/i/web/status/9\\u2026"}]}, "retweeted": false, "coordinates": null, "source": "<a href=\\"https://mobile.twitter.com\\" rel=\\"nofollow\\">Twitter Lite</a>", "in_reply_to_screen_name": null, "id_str": "991443480105930752", "retweet_count": 54, "in_reply_to_user_id": null, "favorited": false, "user": {"follow_request_sent": null, "profile_use_background_image": true, "default_profile_image": false, "id": 780395768595288064, "default_profile": true, "verified": false, "profile_image_url_https": "https://pbs.twimg.com/profile_images/979507130343411712/ZraoxJD3_normal.jpg", "profile_sidebar_fill_color": "DDEEF6", "profile_text_color": "333333", "followers_count": 580, "profile_sidebar_border_color": "C0DEED", "id_str": "780395768595288064", "profile_background_color": "F5F8FA", "listed_count": 11, "profile_background_image_url_https": "", "utc_offset": null, "statuses_count": 1085, "description": "Mammy & Wife\\u2764Feminist~Living Simply~Community Support Worker~Hyper-Empathic #TogetherForYes #Repealthe8th #StarWars #GlobalEquality #Grunge #Irishfolklore\\ud83d\\udc9a", "friends_count": 576, "location": "Ireland", "profile_link_color": "1DA1F2", "profile_image_url": "http://pbs.twimg.com/profile_images/979507130343411712/ZraoxJD3_normal.jpg", "following": null, "geo_enabled": true, "profile_banner_url": "https://pbs.twimg.com/profile_banners/780395768595288064/1521243230", "profile_background_image_url": "", "name": "Eir\\u00fa_F\\u00f3dla\\u2122", "lang": "en", "profile_background_tile": false, "favourites_count": 2781, "screen_name": "SarahEiruFodla", "notifications": null, "url": null, "created_at": "Mon Sep 26 13:17:05 +0000 2016", "contributors_enabled": false, "time_zone": null, "protected": false, "translator_type": "none", "is_translator": false}, "geo": null, "in_reply_to_user_id_str": null, "lang": "en", "extended_tweet": {"display_text_range": [0, 279], "entities": {"user_mentions": [], "symbols": [], "hashtags": [{"indices": [30, 41], "text": "savethe8th"}, {"indices": [264, 279], "text": "TogetherForYes"}], "urls": []}, "full_text": "Only when I see supporters of #savethe8th hand out free condoms/pills, deliver realistic sex education, offer quality low cost child care, tackle housing issues, demand more rights/supports/flexibility for working Mums, will I believe they want to reduce abortion\\n#TogetherForYes"}, "created_at": "Tue May 01 22:25:20 +0000 2018", "filter_level": "low", "in_reply_to_status_id_str": null, "place": null}, "user": {"follow_request_sent": null, "profile_use_background_image": true, "default_profile_image": false, "id": 46169681, "default_profile": false, "verified": false, "profile_image_url_https": "https://pbs.twimg.com/profile_images/977339741266886658/54HlVSgd_normal.jpg", "profile_sidebar_fill_color": "ADED5E", "profile_text_color": "2888C3", "followers_count": 450, "profile_sidebar_border_color": "1EB826", "id_str": "46169681", "profile_background_color": "FFFFFF", "listed_count": 4, "profile_background_image_url_https": "https://pbs.twimg.com/profile_background_images/812957087/5fc996f8750094a442daf05991e32358.jpeg", "utc_offset": 3600, "statuses_count": 28098, "description": "~Pun maker ~Coffee Drinker ~Sleep prioritiser ~ Chicken appreciator ~ #RepealThe8th", "friends_count": 410, "location": "Celbridge, Ireland", "profile_link_color": "0022FF", "profile_image_url": "http://pbs.twimg.com/profile_images/977339741266886658/54HlVSgd_normal.jpg", "following": null, "geo_enabled": true, "profile_banner_url": "https://pbs.twimg.com/profile_banners/46169681/1488638955", "profile_background_image_url": "http://pbs.twimg.com/profile_background_images/812957087/5fc996f8750094a442daf05991e32358.jpeg", "name": "Georgia Tracey", "lang": "en", "profile_background_tile": false, "favourites_count": 2911, "screen_name": "GeorgiaTraceyy", "notifications": null, "url": null, "created_at": "Wed Jun 10 17:31:40 +0000 2009", "contributors_enabled": false, "time_zone": "Dublin", "protected": false, "translator_type": "none", "is_translator": false}, "geo": null, "in_reply_to_user_id_str": null, "lang": "en", "created_at": "Wed May 02 08:01:46 +0000 2018", "filter_level": "low", "in_reply_to_status_id_str": null, "place": null}',
  'tweet_type': 'retweet',
  'user_time_zone': 'Dublin',
  'user_id': '46169681',
  'user_location': 'Celbridge, Ireland'})

Get the top hashtags


In [4]:
tweets_rdd.flatMap(lambda t: t[1]['hashtags']).map(lambda x: (x, 1)).reduceByKey(lambda x,y: x + y).sortBy(lambda x: x[1], ascending=False).take(10)


Out[4]:
[('repealthe8th', 84179),
 ('together4yes', 57253),
 ('savethe8th', 43656),
 ('8thref', 35935),
 ('togetherforyes', 18763),
 ('lovebothvoteno', 18290),
 ('voteyes', 8245),
 ('loveboth', 8109),
 ('latelate', 5920),
 ('men4yes', 5884)]

Get the top user languages

By parsing and extracting from each tweet since it is not already a field.


In [5]:
import json

parsed_tweets_rdd = tweets_rdd.map(lambda x: json.loads(x[1]['tweet'])).persist()
parsed_tweets_rdd.map(lambda t: (t['user']['lang'], 1)).reduceByKey(lambda x,y: x + y).sortBy(lambda x: x[1], ascending=False).take(10)


Out[5]:
[('en', 441308),
 ('en-gb', 19521),
 ('ja', 4473),
 ('en-GB', 4193),
 ('es', 2195),
 ('fr', 1582),
 ('ga', 992),
 ('de', 878),
 ('it', 812),
 ('ru', 517)]

Using a SQL table

Create SQL table


In [6]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
es_conf['es.read.field.as.array.include'] = 'hashtags,text,urls'
tweets_df = sqlContext.read.format("org.elasticsearch.spark.sql").options(**es_conf).load()
tweets_df.createOrReplaceTempView("tweets")

In [7]:
tweets_df.printSchema()


root
 |-- created_at: timestamp (nullable = true)
 |-- dataset_id: string (nullable = true)
 |-- favorite_count: integer (nullable = true)
 |-- has_geo: boolean (nullable = true)
 |-- has_media: boolean (nullable = true)
 |-- hashtags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- in_reply_to_screen_name: string (nullable = true)
 |-- in_reply_to_status_id: string (nullable = true)
 |-- in_reply_to_user_id: string (nullable = true)
 |-- mention_screen_names: string (nullable = true)
 |-- mention_user_ids: string (nullable = true)
 |-- retweet_count: integer (nullable = true)
 |-- retweet_quoted_status_id: string (nullable = true)
 |-- retweeted_quoted_screen_name: string (nullable = true)
 |-- retweeted_quoted_user_id: string (nullable = true)
 |-- text: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- tweet: string (nullable = true)
 |-- tweet_id: string (nullable = true)
 |-- tweet_type: string (nullable = true)
 |-- urls: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- user_follower_count: integer (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_language: string (nullable = true)
 |-- user_location: string (nullable = true)
 |-- user_screen_name: string (nullable = true)
 |-- user_time_zone: string (nullable = true)
 |-- user_utc_offset: string (nullable = true)
 |-- user_verified: boolean (nullable = true)

Get the time zone


In [8]:
tz_df = sqlContext.sql("SELECT user_time_zone, count(user_time_zone) FROM tweets group by user_time_zone order by count(user_time_zone) desc")
tz_df.show(10, truncate=False)


+--------------------------+---------------------+
|user_time_zone            |count(user_time_zone)|
+--------------------------+---------------------+
|Dublin                    |113521               |
|Pacific Time (US & Canada)|37240                |
|London                    |30191                |
|Amsterdam                 |19888                |
|Casablanca                |15636                |
|Europe/Dublin             |8694                 |
|Eastern Time (US & Canada)|7565                 |
|Hawaii                    |4303                 |
|Central Time (US & Canada)|3470                 |
|Europe/London             |1890                 |
+--------------------------+---------------------+
only showing top 10 rows

Get the top hashtags


In [9]:
hashtags_df = sqlContext.sql("SELECT hashtag, count(hashtag) from (SELECT explode(hashtags) hashtag FROM tweets) group by hashtag order by count(hashtag) desc")
hashtags_df.show(10, truncate=False)


+--------------+--------------+
|hashtag       |count(hashtag)|
+--------------+--------------+
|repealthe8th  |84179         |
|together4yes  |57253         |
|savethe8th    |43656         |
|8thref        |35935         |
|togetherforyes|18763         |
|lovebothvoteno|18290         |
|voteyes       |8245          |
|loveboth      |8109          |
|latelate      |5920          |
|men4yes       |5884          |
+--------------+--------------+
only showing top 10 rows

Get the top URLs


In [10]:
urls_df = sqlContext.sql("SELECT url, count(url) from (SELECT explode(urls) url FROM tweets) where not url like 'http://twitter.com%' group by url order by count(url) desc")
urls_df.show(10, truncate=False)


+----------------------------------------------------------------------------------------------+----------+
|url                                                                                           |count(url)|
+----------------------------------------------------------------------------------------------+----------+
|http://checktheregister.ie                                                                    |569       |
|http://undecided8.org                                                                         |314       |
|http://www.irishtimes.com/opinion/anti-abortion-posters-fail-to-take-account-of-life-1.3470187|294       |
|http://bit.ly/2keysma                                                                         |292       |
|http://www.checktheregister.ie                                                                |279       |
|http://jrnl.ie/3986043t                                                                       |163       |
|http://crowdfund.togetherforyes.ie                                                            |150       |
|http://www.checktheregister.ie/publicpages/default.aspx?uilang=                               |137       |
|http://youtu.be/itsxbbkp-tq                                                                   |135       |
|http://adoption.ie/wp-content/uploads/2018/04/ara-position-paper-on-8th-amendment.pdf         |131       |
+----------------------------------------------------------------------------------------------+----------+
only showing top 10 rows

Get the top retweets


In [11]:
rt_df = sqlContext.sql("SELECT CONCAT('https://twitter.com/', retweeted_quoted_screen_name, '/status/', retweet_quoted_status_id), count(retweet_quoted_status_id) FROM tweets group by retweet_quoted_status_id, retweeted_quoted_screen_name order by count(retweet_quoted_status_id) desc")
rt_df.show(10, truncate=False)


+----------------------------------------------------------------------------------------------+-------------------------------+
|concat(https://twitter.com/, retweeted_quoted_screen_name, /status/, retweet_quoted_status_id)|count(retweet_quoted_status_id)|
+----------------------------------------------------------------------------------------------+-------------------------------+
|https://twitter.com/Together4yes/status/993174221265174529                                    |4549                           |
|https://twitter.com/markohalloran/status/987318846980751360                                   |2251                           |
|https://twitter.com/campaignforleo/status/987646457045020672                                  |1958                           |
|https://twitter.com/amyhuberman/status/987303602514530304                                     |1614                           |
|https://twitter.com/Sarah_Hyland/status/993520504052092928                                    |1474                           |
|https://twitter.com/campaignforleo/status/988744931501133825                                  |1261                           |
|https://twitter.com/NursepollyRgn/status/985280763942916096                                   |1222                           |
|https://twitter.com/SimonHarrisTD/status/989998862164164609                                   |1221                           |
|https://twitter.com/Iam_here_2018/status/989056513753874433                                   |1128                           |
|https://twitter.com/RealJamesWoods/status/993575065332600834                                  |998                            |
+----------------------------------------------------------------------------------------------+-------------------------------+
only showing top 10 rows

Get the top trigrams


In [13]:
from pyspark.ml.feature import RegexTokenizer, NGram, StopWordsRemover
from pyspark.sql.functions import sort_array, udf, explode
from pyspark.sql.types import ArrayType, StringType

# Text (using distinct)
text_df = tweets_df.select(explode("text").alias("text")).distinct()

# Tokenize
tokenizer = RegexTokenizer(pattern="([:\.!?,]|'s|’s)*\\s+[‘]*", inputCol="text", outputCol="words")
tokenized_df = tokenizer.transform(text_df)

# Stopwords
stop_words = StopWordsRemover.loadDefaultStopWords('english')
stop_words.extend(['rt', ' ', '-', '&amp;', 'it’s', '', 'may', 'see', 'want', 'i’m', 'us', 'make', "we've", "you're", "you've", "don't", "i’ve", 'it', 'they’re', 'don’t', 'lets', 'add'])
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words", stopWords=stop_words)
filtered_df = remover.transform(tokenized_df)

# Remove hashtags and URLs and dupes
def clean(arr):
    new_arr = set()
    for item in arr:
        add_to_arr = True
        for startswith in ('#', 'http'):
            if item.startswith(startswith):
                add_to_arr = False
        if add_to_arr:
            new_arr.add(item)
    return list(new_arr)

clean_udf = udf(lambda arr: clean(arr), ArrayType(StringType()))
clean_df = filtered_df.withColumn("clean_words", clean_udf(filtered_df.filtered_words))

# Sort the words
sorted_df = clean_df.select(sort_array('clean_words').alias('sorted_words'))

ngram = NGram(n=3, inputCol="sorted_words", outputCol="ngrams")
ngram_df = ngram.transform(sorted_df).select(explode('ngrams').alias('ngrams'))
ngram_df.groupBy('ngrams').count().orderBy('count', ascending=False).show(20, truncate=False)


+--------------------------------------------+-----+
|ngrams                                      |count|
+--------------------------------------------+-----+
|vote women yes                              |362  |
|voting women yes                            |243  |
|help please support                         |122  |
|vote voting yes                             |117  |
|8th abortion amendment                      |104  |
|@josephamadigan @simonharristd @together4yes|99   |
|care change compassion                      |88   |
|@conmurphysport @gordonwdarcy @kevinmcgahern|85   |
|@andyleeboxing @conmurphysport @gordonwdarcy|83   |
|irish referendum urged                      |78   |
|@gordonwdarcy @kevinmcgahern @richiesadlier |75   |
|please privilege right                      |74   |
|registered sure vote                        |74   |
|perfect please privilege                    |73   |
|child every life                            |72   |
|@simonharristd @together4yes abortion       |72   |
|life perfect please                         |72   |
|@kevinmcgahern @richiesadlier @together4yes |71   |
|emigrants irish referendum                  |70   |
|every life perfect                          |69   |
+--------------------------------------------+-----+
only showing top 20 rows