Setting up the ElasticSearch and Spark cluster is described in https://github.com/justinlittman/TweetSets.
To run pyspark-notebook:
elasticsearch-hadoop-6.2.2.jar).docker run -it --rm -p 8888:8888 --net=host --pid=host -e TINI_SUBREAPER=true -v ~/notebooks:/home/jovyan/work -v ~/elasticsearch-hadoop-6.2.2.jar:/home/jovyan/elasticsearch-hadoop-6.2.2.jar jupyter/pyspark-notebookA few notes:
In [1]:
import os
import pyspark
# Add the elasticsearch-hadoop jar
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /home/jovyan/elasticsearch-hadoop-6.2.2.jar pyspark-shell'
conf = pyspark.SparkConf()
# Point to the master.
conf.setMaster("spark://tweetsets.library.gwu.edu:7101")
conf.setAppName("pyspark-elasticsearch-demo")
conf.set("spark.driver.bindAddress", "0.0.0.0")
# Don't hog all of the cores.
conf.set("spark.cores.max", "3")
# Specify a port for the block manager (which runs as part of the worker). The range 7003-7028 is set
# to be open in the Spark worker container.
conf.set("spark.blockManager.port", "7004")
# create the context
sc = pyspark.SparkContext(conf=conf)
In [2]:
# Configure for ElasticSearch cluster and index.
es_conf = {"es.nodes": "tweetsets.library.gwu.edu",
"es.port": "9200",
"es.resource": "tweets-ba2157/doc"}
tweets_rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\
"org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=es_conf)
In [3]:
tweets_rdd.first()
Out[3]:
In [4]:
tweets_rdd.flatMap(lambda t: t[1]['hashtags']).map(lambda x: (x, 1)).reduceByKey(lambda x,y: x + y).sortBy(lambda x: x[1], ascending=False).take(10)
Out[4]:
In [5]:
import json
parsed_tweets_rdd = tweets_rdd.map(lambda x: json.loads(x[1]['tweet'])).persist()
parsed_tweets_rdd.map(lambda t: (t['user']['lang'], 1)).reduceByKey(lambda x,y: x + y).sortBy(lambda x: x[1], ascending=False).take(10)
Out[5]:
In [6]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
es_conf['es.read.field.as.array.include'] = 'hashtags,text,urls'
tweets_df = sqlContext.read.format("org.elasticsearch.spark.sql").options(**es_conf).load()
tweets_df.createOrReplaceTempView("tweets")
In [7]:
tweets_df.printSchema()
In [8]:
tz_df = sqlContext.sql("SELECT user_time_zone, count(user_time_zone) FROM tweets group by user_time_zone order by count(user_time_zone) desc")
tz_df.show(10, truncate=False)
In [9]:
hashtags_df = sqlContext.sql("SELECT hashtag, count(hashtag) from (SELECT explode(hashtags) hashtag FROM tweets) group by hashtag order by count(hashtag) desc")
hashtags_df.show(10, truncate=False)
In [10]:
urls_df = sqlContext.sql("SELECT url, count(url) from (SELECT explode(urls) url FROM tweets) where not url like 'http://twitter.com%' group by url order by count(url) desc")
urls_df.show(10, truncate=False)
In [11]:
rt_df = sqlContext.sql("SELECT CONCAT('https://twitter.com/', retweeted_quoted_screen_name, '/status/', retweet_quoted_status_id), count(retweet_quoted_status_id) FROM tweets group by retweet_quoted_status_id, retweeted_quoted_screen_name order by count(retweet_quoted_status_id) desc")
rt_df.show(10, truncate=False)
In [13]:
from pyspark.ml.feature import RegexTokenizer, NGram, StopWordsRemover
from pyspark.sql.functions import sort_array, udf, explode
from pyspark.sql.types import ArrayType, StringType
# Text (using distinct)
text_df = tweets_df.select(explode("text").alias("text")).distinct()
# Tokenize
tokenizer = RegexTokenizer(pattern="([:\.!?,]|'s|’s)*\\s+[‘]*", inputCol="text", outputCol="words")
tokenized_df = tokenizer.transform(text_df)
# Stopwords
stop_words = StopWordsRemover.loadDefaultStopWords('english')
stop_words.extend(['rt', ' ', '-', '&', 'it’s', '', 'may', 'see', 'want', 'i’m', 'us', 'make', "we've", "you're", "you've", "don't", "i’ve", 'it', 'they’re', 'don’t', 'lets', 'add'])
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words", stopWords=stop_words)
filtered_df = remover.transform(tokenized_df)
# Remove hashtags and URLs and dupes
def clean(arr):
new_arr = set()
for item in arr:
add_to_arr = True
for startswith in ('#', 'http'):
if item.startswith(startswith):
add_to_arr = False
if add_to_arr:
new_arr.add(item)
return list(new_arr)
clean_udf = udf(lambda arr: clean(arr), ArrayType(StringType()))
clean_df = filtered_df.withColumn("clean_words", clean_udf(filtered_df.filtered_words))
# Sort the words
sorted_df = clean_df.select(sort_array('clean_words').alias('sorted_words'))
ngram = NGram(n=3, inputCol="sorted_words", outputCol="ngrams")
ngram_df = ngram.transform(sorted_df).select(explode('ngrams').alias('ngrams'))
ngram_df.groupBy('ngrams').count().orderBy('count', ascending=False).show(20, truncate=False)