In [1]:
/** Verify that the kernel is up and running */
println(s"Kernel is up and running")
In [2]:
/* specify schema for clickstream data */
import org.apache.spark.sql.types._
val clickdataSchema = StructType(Array(
StructField("eventId", LongType, false),
StructField("eventType", StringType, false),
StructField("timestamp", StringType, false),
StructField("ipaddress", StringType, false),
StructField("sessionId", StringType, false),
StructField("userId", StringType, false),
StructField("pageUrl", StringType, false),
StructField("browser", StringType, false)))
In [3]:
/** BLUSpark imports and connection information */
import sys.process._
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import com.ibm.bluspark.catalog.TableSchema
import com.ibm.bluspark.oltp.OLTPContext
import com.ibm.bluspark.example.DataGenerator
import com.ibm.bluspark.common.ConfigurationReader
import com.ibm.bluspark.oltp.InsertResult
import com.ibm.bluspark.example.BluSparkUtil
ConfigurationReader.setConnectionEndpoints("XX.XX.XX.XX.5555")
In [4]:
/** Run this cell if you need to CREATE the database. */
val oltpContext = OLTPContext.createDatabase("CLICKDB")
val error = oltpContext.openDatabase()
error.map(e => sys.error(e.toString))
Out[4]:
In [5]:
/** Define Table schema for clickstream data */
val clickStreamSchema = TableSchema("ClickStreamTable", clickdataSchema, Array("eventId"),Array("eventId"))
In [6]:
/** Create the table - skip if table is already created */
var res = oltpContext.dropTable(clickStreamSchema.tableName)
var res = oltpContext.createTable(clickStreamSchema)
if (res.isDefined) {
println(s"Error while creating table ${clickStreamSchema.tableName}\n: ${res.get}")
} else {
println(s"Table ${clickStreamSchema.tableName} successfully created.")
}
In [7]:
val clickstreamTable = oltpContext.getTable("ClickStreamTable")
In [8]:
/* initialize spark session*/
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrameReader
val spark: SparkSession = SparkSession.
builder().
getOrCreate()
import spark.implicits._
val clickStreamDF = spark.read.option("header", "true").option("inferSchema", false).schema(clickdataSchema).csv("click_stream_dataV52.csv")
clickStreamDF.show(5)
In [9]:
/** Iteratively Insert rows in batch */
import collection.JavaConverters._
val iter = clickStreamDF.toLocalIterator()
val error = oltpContext.batchInsert(clickstreamTable, iter.asScala)
if (error.isDefined) {
System.err.println(error)
}
println(s"""Ingest completed successfully""")