This notebook will be the second in our series, we are going to utilize the Apache Spark and Cloudant Bluemix services to read data into Spark Dataframes from our IBM Cloudant Bluemix service.
Please reference the first notebook in our series, Streaming Meetups to IBM Cloudant using Spark, for a detailed list of prerequisites to get up and running.
In order to run this demonstration notebook we are using the cloudant-spark library. These scala dependencies/jars are added to our environment using the AddJar magic from the Spark Kernel, which adds the specified jars to the Spark Kernel and Spark cluster.
In [1]:
%AddJar https://dl.dropboxusercontent.com/u/19043899/cloudant-spark.jar
The Bluemix Apache Spark service notebook comes with a spark context ready to use, but we are going to have to modify this one to configure built in support for cloudant. Note for the demo purposes we are setting the spark master to run in local mode, but by default the Spark service will run in cluster mode. Update the HOST, USERNAME, and PASSWORD below with the credentials to connect to your Bluemix Cloudant service which our demo depends on. You can get these credentials from the Palette on the right by clicking on the Data Source option. If your data source does not exist add it using the Add Source button or if it already does you can use the "Insert to code" button to add the information to the notebook.
In [2]:
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
val conf = sc.getConf
conf.set("cloudant.host","HOST")
conf.set("cloudant.username", "USERNAME")
conf.set("cloudant.password","PASSWORD")
conf.setJars(ClassLoader.getSystemClassLoader.asInstanceOf[java.net.URLClassLoader].getURLs.map(_.toString).toSet.toSeq ++ kernel.interpreter.classLoader.asInstanceOf[java.net.URLClassLoader].getURLs.map(_.toString).toSeq)
conf.set("spark.driver.allowMultipleContexts", "true")
conf.set("spark.master","local[*]")
val scCloudant = new SparkContext(conf)
scCloudant.getConf.getAll.foreach(println)
In [3]:
def readFromDatabase(sqlContext: SQLContext, databaseName: String) = {
val df = sqlContext.read.format("com.cloudant.spark").load(databaseName)
df
}
In [4]:
val sqlContext = new SQLContext(scCloudant)
import sqlContext.implicits._
val df = readFromDatabase(sqlContext, "meetup_group")
df.show(5)
df.filter(df("group_city")==="Austin").show(5)