In this Notebook, we show how to load the custom library generate as part of the Twitter + Watson Tone Analyzer streaming application. Code can be found here: https://github.com/ibm-cds-labs/spark.samples/tree/master/streaming-twitter. The following code is using a pre-built jar has been posted on the Github project, but you can replace with your own url if needed.
In [1]:
%AddJar https://github.com/ibm-cds-labs/spark.samples/raw/master/dist/streaming-twitter-assembly-1.6.jar -f
In [2]:
val demo = com.ibm.cds.spark.samples.StreamingTwitter
demo.setConfig("twitter4j.oauth.consumerKey","XXXX")
demo.setConfig("twitter4j.oauth.consumerSecret","XXXX")
demo.setConfig("twitter4j.oauth.accessToken","XXXX")
demo.setConfig("twitter4j.oauth.accessTokenSecret","XXXX")
demo.setConfig("watson.tone.url","https://gateway.watsonplatform.net/tone-analyzer-beta/api")
demo.setConfig("watson.tone.password","XXXX")
demo.setConfig("watson.tone.username","XXXX")
Start a new Twitter Stream that collects the live tweets and enrich them with Sentiment Analysis scores. The stream is run for a duration specified in the second argument of the startTwitterStreaming method. Note: if no duration is specified then the stream will run until the stopTwitterStreaming method is called.
In [3]:
import org.apache.spark.streaming._
demo.startTwitterStreaming(sc, Seconds(40))
In [4]:
val (sqlContext, df) = demo.createTwitterDataFrames(sc)
In [5]:
val fullSet = sqlContext.sql("select * from tweets") //Select all columns
fullSet.show
In [6]:
val set = sqlContext.sql("select text from tweets where Anger > 60")
println(set.count)
set.show
In [7]:
fullSet.repartition(1).saveAsParquetFile("swift://notebooks.spark/tweetsFull.parquet")
In [ ]: