install the streaming twitter jar in the notebook from the Github repo


In [3]:
import pixiedust
jarPath = "https://github.com/ibm-watson-data-lab/spark.samples/raw/master/dist/streaming-twitter-assembly-1.6.jar"
pixiedust.installPackage(jarPath)


Pixiedust database opened successfully
Pixiedust version 0.68
Package already installed: https://github.com/ibm-watson-data-lab/spark.samples/raw/master/dist/streaming-twitter-assembly-1.6.jar
Out[3]:
<pixiedust.packageManager.package.Package at 0x7f6397a97150>

Use Scala Bridge to run the command line version of the app

For instruction on how to set up the twitter and Tone Analyzer credentials, please refer to https://developer.ibm.com/clouddataservices/2016/01/15/real-time-sentiment-analysis-of-twitter-hashtags-with-spark/


In [4]:
twitterConsumerKey = "SOOo6EmsqAbfVMEidXy12DvRZ"
twitterConsumerSecret = "Ni1OIqqkei0aq60vC8wrei2WPTpCCX4j0EXEBd80PPebOgUZKk"
twitterAccessToken = "404118511-z4rf7f1Qm85oWQncf7Y59yc1oKHQjhRFOdRhN2Wm"
twitterAccessTokenSecret = "QqUSWaJr7GCak1P75PheBstQJjbZyrRZSRfzqFMyjvjEP"
toneAnalyzerPassword = "UMioqyrFAaNi"
toneAnalyzerUserName = "a3e0dd21-ebe9-4475-a6ab-eb2f6382db27"

In [5]:
%%scala
val demo = com.ibm.cds.spark.samples.StreamingTwitter
demo.setConfig("twitter4j.oauth.consumerKey",twitterConsumerKey)
demo.setConfig("twitter4j.oauth.consumerSecret",twitterConsumerSecret)
demo.setConfig("twitter4j.oauth.accessToken",twitterAccessToken)
demo.setConfig("twitter4j.oauth.accessTokenSecret",twitterAccessTokenSecret)
demo.setConfig("watson.tone.url","https://gateway.watsonplatform.net/tone-analyzer/api")
demo.setConfig("watson.tone.password",toneAnalyzerPassword)
demo.setConfig("watson.tone.username",toneAnalyzerUserName)

import org.apache.spark.streaming._
demo.startTwitterStreaming(sc, Seconds(30))


Starting twitter stream
Twitter stream started
Tweets are collected real-time and analyzed
To stop the streaming and start interacting with the data use: StreamingTwitter.stopTwitterStreaming
Receiver Started: TwitterReceiver-0
Batch started with 105 records
Batch completed with 105 records
Batch started with 246 records
Stopping Twitter stream. Please wait this may take a while
Receiver Stopped: TwitterReceiver-0
Reason:  : Stopped by driver
Batch completed with 246 records
Twitter stream stopped
You can now create a sqlContext and DataFrame with 24 Tweets created. Sample usage: 
val (sqlContext, df) = com.ibm.cds.spark.samples.StreamingTwitter.createTwitterDataFrames(sc)
df.printSchema
sqlContext.sql("select author, text from tweets").show

In [6]:
%%scala
val demo = com.ibm.cds.spark.samples.StreamingTwitter
val (__sqlContext, __df) = demo.createTwitterDataFrames(sc)


A new table named tweets with 24 records has been correctly created and can be accessed through the SQLContext variable
Here's the schema for tweets
root
 |-- author: string (nullable = true)
 |-- userid: string (nullable = true)
 |-- date: string (nullable = true)
 |-- lang: string (nullable = true)
 |-- text: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- Anger: double (nullable = true)
 |-- Disgust: double (nullable = true)
 |-- Fear: double (nullable = true)
 |-- Joy: double (nullable = true)
 |-- Sadness: double (nullable = true)
 |-- Analytical: double (nullable = true)
 |-- Confident: double (nullable = true)
 |-- Tentative: double (nullable = true)
 |-- Openness: double (nullable = true)
 |-- Conscientiousness: double (nullable = true)
 |-- Extraversion: double (nullable = true)
 |-- Agreeableness: double (nullable = true)
 |-- EmotionalRange: double (nullable = true)

Do some data science with the DataFrame __df obtained from the scala code above


In [7]:
tweets=__df
tweets.count()
display(tweets)


Hey, there's something awesome here! To see it, open this notebook outside GitHub, in a viewer like Jupyter
type: struct
field:
{'metadata': {}, 'type': 'string', 'name': 'author', 'nullable': True}
{'metadata': {}, 'type': 'string', 'name': 'userid', 'nullable': True}
{'metadata': {}, 'type': 'string', 'name': 'date', 'nullable': True}
{'metadata': {}, 'type': 'string', 'name': 'lang', 'nullable': True}
{'metadata': {}, 'type': 'string', 'name': 'text', 'nullable': True}
{'metadata': {}, 'type': 'double', 'name': 'lat', 'nullable': True}
{'metadata': {}, 'type': 'double', 'name': 'long', 'nullable': True}
{'metadata': {}, 'type': 'double', 'name': 'Anger', 'nullable': True}
{'metadata': {}, 'type': 'double', 'name': 'Disgust', 'nullable': True}
{'metadata': {}, 'type': 'double', 'name': 'Fear', 'nullable': True}
{'metadata': {}, 'type': 'double', 'name': 'Joy', 'nullable': True}
{'metadata': {}, 'type': 'double', 'name': 'Sadness', 'nullable': True}
{'metadata': {}, 'type': 'double', 'name': 'Analytical', 'nullable': True}
{'metadata': {}, 'type': 'double', 'name': 'Confident', 'nullable': True}
{'metadata': {}, 'type': 'double', 'name': 'Tentative', 'nullable': True}
{'metadata': {}, 'type': 'double', 'name': 'Openness', 'nullable': True}
{'metadata': {}, 'type': 'double', 'name': 'Conscientiousness', 'nullable': True}
{'metadata': {}, 'type': 'double', 'name': 'Extraversion', 'nullable': True}
{'metadata': {}, 'type': 'double', 'name': 'Agreeableness', 'nullable': True}
{'metadata': {}, 'type': 'double', 'name': 'EmotionalRange', 'nullable': True}
Showing 24 of 24
author
userid
date
lang
text
lat
long
Anger
Disgust
Fear
Joy
Sadness
Analytical
Confident
Tentative
Openness
Conscientiousness
Extraversion
Agreeableness
EmotionalRange
author
userid
date
lang
text
lat
long
Anger
Disgust
Fear
Joy
Sadness
Analytical
Confident
Tentative
Openness
Conscientiousness
Extraversion
Agreeableness
EmotionalRange
kay 🎅🏼🎁 falloutjackss Wed Nov 30 11:01:02 CST 2016 en You are quite optimistic about recent events on the home front... More for Virgo https://t.co/tnDgFfVOih 0.0 0.0 21.0 4.0 14.0 59.0 9.0 0.0 0.0 95.0 47.0 4.0 61.0 59.0 0.0
Ted Regulski tedregulski Wed Nov 30 11:01:02 CST 2016 en @Lin_Manuel Which is harder to get, NES Classic or Hamilton tickets? I'm not sure... :) 0.0 0.0 15.0 3.0 8.0 6.0 76.0 0.0 0.0 99.0 16.0 1.0 8.0 32.0 0.0
Nat _natalyaj Wed Nov 30 11:01:02 CST 2016 en @_sazZzy @nyleclemons https://t.co/k30S7lHFaB 0.0 0.0 6.0 1.0 15.0 82.0 2.0 0.0 0.0 0.0 7.0 15.0 27.0 46.0 0.0
SeeAych CoolKidCheeks Wed Nov 30 11:01:02 CST 2016 en @belikelika it's the water content that messes it up innit 0.0 0.0 9.0 22.0 9.0 38.0 28.0 69.0 0.0 0.0 53.0 22.0 36.0 3.0 0.0
wxldfirelarry Wed Nov 30 11:01:02 CST 2016 en RT @ShadyDLAGFacts: Smallest one there but has the biggest range.. Ladies and Gentlemen, Demi Lovato https://t.co/ToDRgNLdzc 0.0 0.0 3.0 7.0 9.0 57.0 29.0 0.0 0.0 0.0 31.0 23.0 56.0 18.0 0.0
wuss poppin jim🅱o epilepticshrimp Wed Nov 30 11:01:02 CST 2016 en @NHL what 0.0 0.0 5.0 24.0 28.0 5.0 50.0 0.0 0.0 0.0 21.0 13.0 44.0 60.0 0.0
Noel NoelcNoel Wed Nov 30 11:01:02 CST 2016 en Who is more pixelated, Olbermann or Glenn Beck? https://t.co/YGsvWfMmst 0.0 0.0 25.0 9.0 22.0 35.0 14.0 0.0 0.0 89.0 85.0 15.0 71.0 47.0 0.0
Keven Kevenbh1 Wed Nov 30 11:01:02 CST 2016 en RT @neurohate: mood https://t.co/kgpj8f2ZW4 0.0 0.0 5.0 1.0 2.0 84.0 11.0 0.0 0.0 0.0 4.0 27.0 51.0 59.0 0.0
Diamond DiamondM26 Wed Nov 30 11:01:02 CST 2016 en @DiamondM26 and then my teacher called on me relentlessly like Miss can't you tell I just rolled out of bes 0.0 0.0 36.0 18.0 8.0 3.0 49.0 60.0 0.0 73.0 28.0 76.0 88.0 80.0 0.0
Steve Kirby Skinny_Steve_A1 Wed Nov 30 11:01:02 CST 2016 en @EdMorrissey @malonebarry 2016 continues to 2016. 0.0 0.0 15.0 17.0 7.0 10.0 62.0 0.0 0.0 0.0 12.0 22.0 30.0 65.0 0.0
Adrian YoAdrian25 Wed Nov 30 11:01:02 CST 2016 en RT @Dory: *hits blunt* Bruh do you think the ocean is salty because the land never waves back? https://t.co/iONzPn9eGT 0.0 0.0 45.0 6.0 12.0 10.0 32.0 0.0 37.0 0.0 80.0 62.0 80.0 59.0 0.0
Ga Vinci code gavinteresting Wed Nov 30 11:01:00 CST 2016 en RT @UncleSego: When someone says your joke louder than you and gets all the laughs https://t.co/JArVfnH8TF 0.0 0.0 11.0 14.0 12.0 48.0 21.0 0.0 0.0 0.0 39.0 18.0 68.0 56.0 0.0
pat patriciacpstrno Wed Nov 30 11:01:01 CST 2016 en 5 AM ako gigising but gising pa ko wow nice 0.0 0.0 12.0 3.0 7.0 21.0 64.0 0.0 0.0 0.0 7.0 27.0 52.0 60.0 0.0
Mitch Mitchz fmab_dx Wed Nov 30 11:01:03 CST 2016 en feeling tired 0.0 0.0 12.0 3.0 2.0 29.0 60.0 0.0 0.0 0.0 12.0 13.0 44.0 60.0 0.0
Muhammad Rayyan rayyanmuhammadd Wed Nov 30 11:01:03 CST 2016 en @NurFazlis jual mahal sgt . Difficult hehehehe 0.0 0.0 32.0 21.0 9.0 15.0 28.0 0.0 0.0 0.0 16.0 27.0 54.0 60.0 0.0
Domo d_andreaaaaa Wed Nov 30 11:01:03 CST 2016 en RT @The1stWright: Knowing you're not looking for a relationship >>>> 0.0 0.0 4.0 6.0 11.0 47.0 41.0 0.0 0.0 0.0 19.0 1.0 50.0 28.0 0.0
adore mimi adoremsmimi Wed Nov 30 11:01:03 CST 2016 en Exercising your creativity in everything you do raises the qua... More for Leo https://t.co/DwWzMirmqR 0.0 0.0 4.0 1.0 11.0 84.0 6.0 47.0 0.0 0.0 18.0 47.0 50.0 64.0 0.0
Randy Lewelling RandyLewelling Wed Nov 30 11:01:03 CST 2016 en At a Region 17 workshop #esc17TRC 0.0 0.0 10.0 11.0 9.0 49.0 30.0 35.0 0.0 0.0 31.0 27.0 48.0 55.0 0.0
🌸jisteph pindahsini xlovesephi Wed Nov 30 11:01:03 CST 2016 en Anies Baswedan: 35 Persen Siswa Tak Lulus SMA, Itu Bukan Salah Ahok https://t.co/2nCEuniIkB 0.0 0.0 9.0 1.0 9.0 80.0 6.0 0.0 0.0 0.0 14.0 27.0 54.0 60.0 0.0
#PURPOSETOURINDIA Rechu1825 Wed Nov 30 11:01:03 CST 2016 en @GetPurposeMerch mer 0.0 0.0 4.0 35.0 25.0 7.0 44.0 0.0 0.0 0.0 19.0 13.0 44.0 60.0 0.0
Max Cliburn m_cliburn Wed Nov 30 11:01:03 CST 2016 en RT @hunter_roach23: Ready for the weekend 0.0 0.0 0.0 18.0 13.0 65.0 7.0 0.0 0.0 0.0 20.0 61.0 40.0 48.0 0.0
n. cinnahyun Wed Nov 30 11:01:03 CST 2016 en RT @deansintro: jungkook from bts doing a cover of d(half moon) on v app - hes such a fanboy its cute https://t.co/3m9ThpAY0J 0.0 0.0 9.0 5.0 5.0 68.0 20.0 0.0 1.0 0.0 20.0 9.0 4.0 29.0 0.0
Joya Williams Soy2TheWorld Wed Nov 30 11:01:02 CST 2016 en Manzel Bowman. #soyjoynews https://t.co/6RE1kDnf8s 0.0 0.0 8.0 3.0 5.0 68.0 19.0 0.0 0.0 0.0 23.0 13.0 44.0 60.0 0.0
Asia Asia djasia1 Wed Nov 30 11:01:01 CST 2016 en Just Give Me The Name Niggas 0.0 0.0 9.0 7.0 15.0 8.0 69.0 0.0 0.0 95.0 95.0 48.0 60.0 38.0 0.0

In [8]:
#create an array that will hold the count for each sentiment
sentimentDistribution=[0] * 13
#For each sentiment, run a sql query that counts the number of tweets for which the sentiment score is greater than 60%
#Store the data in the array
for i, sentiment in enumerate(tweets.columns[-13:]):
    sentimentDistribution[i]=__sqlContext.sql("SELECT count(*) as sentCount FROM tweets where " + sentiment + " > 60")\
        .collect()[0].sentCount

In [9]:
%matplotlib inline
import matplotlib
import numpy as np
import matplotlib.pyplot as plt

ind=np.arange(13)
width = 0.35
bar = plt.bar(ind, sentimentDistribution, width, color='g', label = "distributions")

params = plt.gcf()
plSize = params.get_size_inches()
params.set_size_inches( (plSize[0]*2.5, plSize[1]*2) )
plt.ylabel('Tweet count')
plt.xlabel('Tone')
plt.title('Distribution of tweets by sentiments > 60%')
plt.xticks(ind+width, tweets.columns[-13:])
plt.legend()

plt.show()



In [ ]:
from operator import add
import re
tagsRDD = tweets.flatMap( lambda t: re.split("\s", t.text))\
    .filter( lambda word: word.startswith("#") )\
    .map( lambda word : (word, 1 ))\
    .reduceByKey(add, 10).map(lambda (a,b): (b,a)).sortByKey(False).map(lambda (a,b):(b,a))
top10tags = tagsRDD.take(10)

In [ ]:
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt

params = plt.gcf()
plSize = params.get_size_inches()
params.set_size_inches( (plSize[0]*2, plSize[1]*2) )

labels = [i[0] for i in top10tags]
sizes = [int(i[1]) for i in top10tags]
colors = ['yellowgreen', 'gold', 'lightskyblue', 'lightcoral', "beige", "paleturquoise", "pink", "lightyellow", "coral"]

plt.pie(sizes, labels=labels, colors=colors,autopct='%1.1f%%', shadow=True, startangle=90)

plt.axis('equal')
plt.show()

In [ ]:
cols = tweets.columns[-13:]
def expand( t ):
    ret = []
    for s in [i[0] for i in top10tags]:
        if ( s in t.text ):
            for tone in cols:
                ret += [s.replace(':','').replace('-','') + u"-" + unicode(tone) + ":" + unicode(getattr(t, tone))]
    return ret 
def makeList(l):
    return l if isinstance(l, list) else [l]

#Create RDD from tweets dataframe
tagsRDD = tweets.map(lambda t: t )

#Filter to only keep the entries that are in top10tags
tagsRDD = tagsRDD.filter( lambda t: any(s in t.text for s in [i[0] for i in top10tags] ) )

#Create a flatMap using the expand function defined above, this will be used to collect all the scores 
#for a particular tag with the following format: Tag-Tone-ToneScore
tagsRDD = tagsRDD.flatMap( expand )

#Create a map indexed by Tag-Tone keys 
tagsRDD = tagsRDD.map( lambda fullTag : (fullTag.split(":")[0], float( fullTag.split(":")[1]) ))

#Call combineByKey to format the data as follow
#Key=Tag-Tone
#Value=(count, sum_of_all_score_for_this_tone)
tagsRDD = tagsRDD.combineByKey((lambda x: (x,1)),
                  (lambda x, y: (x[0] + y, x[1] + 1)),
                  (lambda x, y: (x[0] + y[0], x[1] + y[1])))

#ReIndex the map to have the key be the Tag and value be (Tone, Average_score) tuple
#Key=Tag
#Value=(Tone, average_score)
tagsRDD = tagsRDD.map(lambda (key, ab): (key.split("-")[0], (key.split("-")[1], round(ab[0]/ab[1], 2))))

#Reduce the map on the Tag key, value becomes a list of (Tone,average_score) tuples
tagsRDD = tagsRDD.reduceByKey( lambda x, y : makeList(x) + makeList(y) )

#Sort the (Tone,average_score) tuples alphabetically by Tone
tagsRDD = tagsRDD.mapValues( lambda x : sorted(x) )

#Format the data as expected by the plotting code in the next cell. 
#map the Values to a tuple as follow: ([list of tone], [list of average score])
#e.g. #someTag:([u'Agreeableness', u'Analytical', u'Anger', u'Cheerfulness', u'Confident', u'Conscientiousness', u'Negative', u'Openness', u'Tentative'], [1.0, 0.0, 0.0, 1.0, 0.0, 0.48, 0.0, 0.02, 0.0])
tagsRDD = tagsRDD.mapValues( lambda x : ([elt[0] for elt in x],[elt[1] for elt in x])  )

#Use custom sort function to sort the entries by order of appearance in top10tags
def customCompare( key ):
    for (k,v) in top10tags:
        if k == key:
            return v
    return 0
tagsRDD = tagsRDD.sortByKey(ascending=False, numPartitions=None, keyfunc = customCompare)

#Take the mean tone scores for the top 10 tags
top10tagsMeanScores = tagsRDD.take(10)

In [ ]:
%matplotlib inline
import matplotlib
import numpy as np
import matplotlib.pyplot as plt

params = plt.gcf()
plSize = params.get_size_inches()
params.set_size_inches( (plSize[0]*3, plSize[1]*2) )

top5tagsMeanScores = top10tagsMeanScores[:5]
width = 0
ind=np.arange(13)
(a,b) = top5tagsMeanScores[0]
labels=b[0]
colors = ["beige", "paleturquoise", "pink", "lightyellow", "coral", "lightgreen", "gainsboro", "aquamarine","c"]
idx=0
for key, value in top5tagsMeanScores:
    plt.bar(ind + width, value[1], 0.15, color=colors[idx], label=key)
    width += 0.15
    idx += 1
plt.xticks(ind+0.3, labels)
plt.ylabel('AVERAGE SCORE')
plt.xlabel('TONES')
plt.title('Breakdown of top hashtags by sentiment tones')

plt.legend(bbox_to_anchor=(0., 1.02, 1., .102), loc='center',ncol=5, mode="expand", borderaxespad=0.)

plt.show()

Use Twitter demo embedded app to run the same app with a UI


In [10]:
%%scala
val demo = com.ibm.cds.spark.samples.PixiedustStreamingTwitter
demo.setConfig("twitter4j.oauth.consumerKey",twitterConsumerKey)
demo.setConfig("twitter4j.oauth.consumerSecret",twitterConsumerSecret)
demo.setConfig("twitter4j.oauth.accessToken",twitterAccessToken)
demo.setConfig("twitter4j.oauth.accessTokenSecret",twitterAccessTokenSecret)
demo.setConfig("watson.tone.url","https://gateway.watsonplatform.net/tone-analyzer/api")
demo.setConfig("watson.tone.password",toneAnalyzerPassword)
demo.setConfig("watson.tone.username",toneAnalyzerUserName)
demo.setConfig("checkpointDir", System.getProperty("user.home") + "/pixiedust/ssc")

In [11]:
from pixiedust_twitterdemo import *
twitterDemo()


Hey, there's something awesome here! To see it, open this notebook outside GitHub, in a viewer like Jupyter

The embedded app has generated a DataFrame called __tweets. Let's use it to do some data science


In [ ]:
display(__tweets)

In [ ]:
from pyspark.sql import Row
from pyspark.sql.types import *
emotions=__tweets.columns[-13:]
distrib = __tweets.flatMap(lambda t: [(x,t[x]) for x in emotions]).filter(lambda t: t[1]>60)\
    .toDF(StructType([StructField('emotion',StringType()),StructField('score',DoubleType())]))
display(distrib)

In [ ]:
__tweets.registerTempTable("pixiedust_tweets")
#create an array that will hold the count for each sentiment
sentimentDistribution=[0] * 13
#For each sentiment, run a sql query that counts the number of tweets for which the sentiment score is greater than 60%
#Store the data in the array
for i, sentiment in enumerate(__tweets.columns[-13:]):
    sentimentDistribution[i]=sqlContext.sql("SELECT count(*) as sentCount FROM pixiedust_tweets where " + sentiment + " > 60")\
        .collect()[0].sentCount

In [ ]:
%matplotlib inline
import matplotlib
import numpy as np
import matplotlib.pyplot as plt

ind=np.arange(13)
width = 0.35
bar = plt.bar(ind, sentimentDistribution, width, color='g', label = "distributions")

params = plt.gcf()
plSize = params.get_size_inches()
params.set_size_inches( (plSize[0]*2.5, plSize[1]*2) )
plt.ylabel('Tweet count')
plt.xlabel('Tone')
plt.title('Distribution of tweets by sentiments > 60%')
plt.xticks(ind+width, __tweets.columns[-13:])
plt.legend()

plt.show()

In [ ]:
from operator import add
import re
tagsRDD = __tweets.flatMap( lambda t: re.split("\s", t.text))\
    .filter( lambda word: word.startswith("#") )\
    .map( lambda word : (word, 1 ))\
    .reduceByKey(add, 10).map(lambda (a,b): (b,a)).sortByKey(False).map(lambda (a,b):(b,a))
top10tags = tagsRDD.take(10)

In [ ]:
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt

params = plt.gcf()
plSize = params.get_size_inches()
params.set_size_inches( (plSize[0]*2, plSize[1]*2) )

labels = [i[0] for i in top10tags]
sizes = [int(i[1]) for i in top10tags]
colors = ['yellowgreen', 'gold', 'lightskyblue', 'lightcoral', "beige", "paleturquoise", "pink", "lightyellow", "coral"]

plt.pie(sizes, labels=labels, colors=colors,autopct='%1.1f%%', shadow=True, startangle=90)

plt.axis('equal')
plt.show()

In [ ]:
cols = __tweets.columns[-13:]
def expand( t ):
    ret = []
    for s in [i[0] for i in top10tags]:
        if ( s in t.text ):
            for tone in cols:
                ret += [s.replace(':','').replace('-','') + u"-" + unicode(tone) + ":" + unicode(getattr(t, tone))]
    return ret 
def makeList(l):
    return l if isinstance(l, list) else [l]

#Create RDD from tweets dataframe
tagsRDD = __tweets.map(lambda t: t )

#Filter to only keep the entries that are in top10tags
tagsRDD = tagsRDD.filter( lambda t: any(s in t.text for s in [i[0] for i in top10tags] ) )

#Create a flatMap using the expand function defined above, this will be used to collect all the scores 
#for a particular tag with the following format: Tag-Tone-ToneScore
tagsRDD = tagsRDD.flatMap( expand )

#Create a map indexed by Tag-Tone keys 
tagsRDD = tagsRDD.map( lambda fullTag : (fullTag.split(":")[0], float( fullTag.split(":")[1]) ))

#Call combineByKey to format the data as follow
#Key=Tag-Tone
#Value=(count, sum_of_all_score_for_this_tone)
tagsRDD = tagsRDD.combineByKey((lambda x: (x,1)),
                  (lambda x, y: (x[0] + y, x[1] + 1)),
                  (lambda x, y: (x[0] + y[0], x[1] + y[1])))

#ReIndex the map to have the key be the Tag and value be (Tone, Average_score) tuple
#Key=Tag
#Value=(Tone, average_score)
tagsRDD = tagsRDD.map(lambda (key, ab): (key.split("-")[0], (key.split("-")[1], round(ab[0]/ab[1], 2))))

#Reduce the map on the Tag key, value becomes a list of (Tone,average_score) tuples
tagsRDD = tagsRDD.reduceByKey( lambda x, y : makeList(x) + makeList(y) )

#Sort the (Tone,average_score) tuples alphabetically by Tone
tagsRDD = tagsRDD.mapValues( lambda x : sorted(x) )

#Format the data as expected by the plotting code in the next cell. 
#map the Values to a tuple as follow: ([list of tone], [list of average score])
#e.g. #someTag:([u'Agreeableness', u'Analytical', u'Anger', u'Cheerfulness', u'Confident', u'Conscientiousness', u'Negative', u'Openness', u'Tentative'], [1.0, 0.0, 0.0, 1.0, 0.0, 0.48, 0.0, 0.02, 0.0])
tagsRDD = tagsRDD.mapValues( lambda x : ([elt[0] for elt in x],[elt[1] for elt in x])  )

#Use custom sort function to sort the entries by order of appearance in top10tags
def customCompare( key ):
    for (k,v) in top10tags:
        if k == key:
            return v
    return 0
tagsRDD = tagsRDD.sortByKey(ascending=False, numPartitions=None, keyfunc = customCompare)

#Take the mean tone scores for the top 10 tags
top10tagsMeanScores = tagsRDD.take(10)

In [ ]:
%matplotlib inline
import matplotlib
import numpy as np
import matplotlib.pyplot as plt

params = plt.gcf()
plSize = params.get_size_inches()
params.set_size_inches( (plSize[0]*3, plSize[1]*2) )

top5tagsMeanScores = top10tagsMeanScores[:5]
width = 0
ind=np.arange(13)
(a,b) = top5tagsMeanScores[0]
labels=b[0]
colors = ["beige", "paleturquoise", "pink", "lightyellow", "coral", "lightgreen", "gainsboro", "aquamarine","c"]
idx=0
for key, value in top5tagsMeanScores:
    plt.bar(ind + width, value[1], 0.15, color=colors[idx], label=key)
    width += 0.15
    idx += 1
plt.xticks(ind+0.3, labels)
plt.ylabel('AVERAGE SCORE')
plt.xlabel('TONES')
plt.title('Breakdown of top hashtags by sentiment tones')

plt.legend(bbox_to_anchor=(0., 1.02, 1., .102), loc='center',ncol=5, mode="expand", borderaxespad=0.)

plt.show()