Spark Streaming sample application using Twitter, Watson Tone Analyzer, Event Hub and Message Hub

In this Notebook, we show how to run a Spark Streaming application using a Notebook. There are multiple limitations to be aware of:

  1. The application will stop when the page is refreshed or closed
  2. As events are being processed, the application generates lots of console output which may cause memory to build up in the browser. Therefore it is not recommended to run the application for too long

The code can be found here: https://github.com/ibm-watson-data-lab/spark.samples/tree/master/streaming-twitter The following code is using a pre-built jar that has been posted on the Github project, but you can replace with your own url if needed.


In [ ]:
%AddJar https://github.com/DTAIEB/demos/raw/master/streaming-twitter-assembly-1.6.jar -f

Set up the credentials for the different services

Please refer to the tutorial for details on how to find the credentials for all the services, then add the value in the placeholders specified in the code below


In [ ]:
val demo = com.ibm.cds.spark.samples.MessageHubStreamingTwitter
val config = demo.getConfig()

//Watson Tone Analyzer service
config.setConfig("watson.tone.url","https://gateway.watsonplatform.net/tone-analyzer-beta/api")
config.setConfig("watson.tone.password","XXXX")
config.setConfig("watson.tone.username","XXXX")

//Message Hub/Kafka service
config.setConfig("bootstrap.servers","kafka01-prod01.messagehub.services.us-south.bluemix.net:9093,kafka02-prod01.messagehub.services.us-south.bluemix.net:9093,kafka03-prod01.messagehub.services.us-south.bluemix.net:9093,kafka04-prod01.messagehub.services.us-south.bluemix.net:9093,kafka05-prod01.messagehub.services.us-south.bluemix.net:9093")
config.setConfig("api_key","XXXX")
config.setConfig("kafka.topic.tweet","twitter-spark")
config.setConfig("kafka.user.name","XXXX")
config.setConfig("kafka.user.password","XXXX")
config.setConfig("kafka_rest_url","https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443")

//Spark Streaming checkpointing configuration with Object Storage Swift container
config.setConfig("name","spark");
config.setConfig("auth_url","https://identity.open.softlayer.com");
config.setConfig("project_id","XXXX");
config.setConfig("region","dallas");
config.setConfig("user_id","XXXX");
config.setConfig("password","XXXX");
config.setConfig("checkpointDir", "swift://notebooks.spark/ssc")

Producing tweets directly from Twitter

Optional: The following cell is to be used only if your MessageConnect service doesn't work.
In the next cell, you configure your Twitter credentials and call the code that will connect to Twitter, fetch the tweets and send them to MessageHub for consumption (Please refer to the tutorial for more information)


In [ ]:
config.setConfig("twitter4j.oauth.consumerKey","XXXX")
config.setConfig("twitter4j.oauth.consumerSecret","XXXX")
config.setConfig("twitter4j.oauth.accessToken","XXXX")
config.setConfig("twitter4j.oauth.accessTokenSecret","XXXX")
val twitterStream = com.ibm.cds.spark.samples.KafkaProducerTest.createTwitterStream(config)

Start the Spark Stream to collect tweets from Message Hub

Start a new Twitter Stream that collects the live tweets and enrich them with Sentiment Analysis scores. The stream is run for a duration specified in the second argument of the startTwitterStreaming method. Note: if no duration is specified then the stream will run until the stopTwitterStreaming method is called.


In [ ]:
demo.startTwitterStreaming(sc)

Close the Tweet producer

Optional: To be used only if you have started it


In [ ]:
com.ibm.cds.spark.samples.KafkaProducerTest.closeTwitterStream

Close the Spark Streaming


In [ ]:
demo.stopTwitterStreaming