In [ ]:
import org.apache.spark.sql._
val working_storage = "WORKING_STORAGE"
val output_directory = "jupyter/scala"
val protocol_name = "PROTOCOL_NAME"
val sqlCtx = new SQLContext(sc)
val hc = sc.hadoopConfiguration
hc.set("hive.execution.engine", "mr")
def bucketPath(path: String) = {
s"$protocol_name://$working_storage/jupyter_dataset/$path"
}
def fullPath(path: String) = {
s"$protocol_name://$working_storage/$output_directory/$path"
}
In [ ]:
val carriers = sqlCtx.read.
format("com.databricks.spark.csv").
option("inferSchema", "true").
option("header", "true").
load(bucketPath("carriers.csv"))
carriers.write.mode(SaveMode.Overwrite).parquet(fullPath("carriers/"))
carriers.createOrReplaceTempView("carriers")
carriers.show(20)
In [ ]:
val airports = sqlCtx.read.
format("com.databricks.spark.csv").
option("inferSchema", "true").
option("header", "true").
load(bucketPath("airports.csv"))
airports.write.mode(SaveMode.Overwrite).parquet(fullPath("airports/"))
airports.createOrReplaceTempView("airports")
airports.show(20)
In [ ]:
import sqlCtx.implicits._
val flights_w_na = sqlCtx.read.
format("com.databricks.spark.csv").
option("inferSchema", "true").
option("header", "true").
option("nullValue", "NA").
load(bucketPath("2008.csv.bz2"))
val flights = flights_w_na.na.fill(0)
flights.write.mode(SaveMode.Overwrite).parquet(fullPath("flights/"))
flights.createOrReplaceTempView("flights")
flights.select($"ArrDelay",$"CarrierDelay",$"WeatherDelay",$"Distance").show(20)
In [ ]: