In [1]:
/** Verify that the kernel is up and running */
println(s"Kernel is up and running")


Kernel is up and running

In [2]:
/* specify schema for clickstream data */
import org.apache.spark.sql.types._

val clickdataSchema = StructType(Array(
      StructField("eventId", LongType, false),
      StructField("eventType", StringType, false),
      StructField("timestamp", StringType, false),
      StructField("ipaddress", StringType, false),
      StructField("sessionId", StringType, false),
      StructField("userId", StringType, false),
      StructField("pageUrl", StringType, false),
      StructField("browser", StringType, false)))

In [3]:
/** BLUSpark imports and connection information */
import sys.process._
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import com.ibm.bluspark.catalog.TableSchema
import com.ibm.bluspark.oltp.OLTPContext
import com.ibm.bluspark.example.DataGenerator
import com.ibm.bluspark.common.ConfigurationReader
import com.ibm.bluspark.oltp.InsertResult
import com.ibm.bluspark.example.BluSparkUtil
ConfigurationReader.setConnectionEndpoints("XX.XX.XX.XX.5555")

In [4]:
/** Run this cell if you need to CREATE the database.  */
val oltpContext = OLTPContext.createDatabase("CLICKDB")
val error =  oltpContext.openDatabase()
error.map(e => sys.error(e.toString))


Out[4]:
None

In [5]:
/** Define Table schema for clickstream data */
val clickStreamSchema = TableSchema("ClickStreamTable", clickdataSchema, Array("eventId"),Array("eventId"))

In [6]:
/** Create the table - skip if table is already created */
var res = oltpContext.dropTable(clickStreamSchema.tableName)
var res = oltpContext.createTable(clickStreamSchema)
if (res.isDefined) {
  println(s"Error while creating table ${clickStreamSchema.tableName}\n: ${res.get}")
} else {
  println(s"Table ${clickStreamSchema.tableName} successfully created.")
}


Table ClickStreamTable successfully created.

In [7]:
val clickstreamTable = oltpContext.getTable("ClickStreamTable")

In [8]:
/* initialize spark session*/
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrameReader

val spark: SparkSession = SparkSession.
    builder().
    getOrCreate()

import spark.implicits._

val clickStreamDF = spark.read.option("header", "true").option("inferSchema", false).schema(clickdataSchema).csv("click_stream_dataV52.csv")

clickStreamDF.show(5)


+-----------+---------+----------+------------+-----------------+------+--------------------+-------+
|    eventId|eventType| timestamp|   ipaddress|        sessionId|userId|             pageUrl|browser|
+-----------+---------+----------+------------+-----------------+------+--------------------+-------+
|20170522901| pageView|1496311260|169.34.56.78|y20170522a4499u21|ceaton|http://www.cybers...| Chrome|
|20170522902| pageView|1496311320|169.34.56.78|y20170522a4499u21|ceaton|http://www.cybers...| Chrome|
|20170522903| pageView|1496311440|169.34.56.78|y20170522a4499u21|ceaton|http://www.cybers...| Chrome|
|20170522904| pageView|1496311500|169.34.56.78|y20170522a4499u21|ceaton|http://www.cybers...| Chrome|
|20170522905| pageView|1496311560|169.34.56.78|y20170522a4499u21|ceaton|http://www.cybers...| Chrome|
+-----------+---------+----------+------------+-----------------+------+--------------------+-------+
only showing top 5 rows


In [9]:
/** Iteratively Insert rows in batch */
import collection.JavaConverters._

val iter = clickStreamDF.toLocalIterator() 
val error = oltpContext.batchInsert(clickstreamTable, iter.asScala)
if (error.isDefined) {
    System.err.println(error)
}
println(s"""Ingest completed successfully""")


Ingest completed successfully