In [ ]:
import org.apache.spark.sql._

val working_storage = "WORKING_STORAGE"
val output_directory = "jupyter/scala"
val protocol_name = "PROTOCOL_NAME"
val sqlCtx = new SQLContext(sc)
val hc = sc.hadoopConfiguration
hc.set("hive.execution.engine", "mr")

def bucketPath(path: String) = {
    s"$protocol_name://$working_storage/jupyter_dataset/$path"
}
def fullPath(path: String) = {
    s"$protocol_name://$working_storage/$output_directory/$path"
}

In [ ]:
val carriers = sqlCtx.read.
                        format("com.databricks.spark.csv").
                        option("inferSchema", "true").
                        option("header", "true").
                        load(bucketPath("carriers.csv"))
carriers.write.mode(SaveMode.Overwrite).parquet(fullPath("carriers/"))
carriers.createOrReplaceTempView("carriers")
carriers.show(20)

In [ ]:
val airports = sqlCtx.read.
                        format("com.databricks.spark.csv").
                        option("inferSchema", "true").
                        option("header", "true").
                        load(bucketPath("airports.csv"))
airports.write.mode(SaveMode.Overwrite).parquet(fullPath("airports/"))
airports.createOrReplaceTempView("airports")
airports.show(20)

In [ ]:
import sqlCtx.implicits._

val flights_w_na = sqlCtx.read.
                        format("com.databricks.spark.csv").
                        option("inferSchema", "true").
                        option("header", "true").
                        option("nullValue", "NA").
                        load(bucketPath("2008.csv.bz2"))
val flights = flights_w_na.na.fill(0)
flights.write.mode(SaveMode.Overwrite).parquet(fullPath("flights/"))
flights.createOrReplaceTempView("flights")
flights.select($"ArrDelay",$"CarrierDelay",$"WeatherDelay",$"Distance").show(20)

In [ ]: