In [ ]:
sparkR.session()

working_storage <- 'WORKING_STORAGE'
output_directory <- 'jupyter/r'
protocol_name <- 'PROTOCOL_NAME'

storage_path <- function(file_path) {
   sprintf('%s://%s/jupyter_dataset/%s', protocol_name, working_storage, file_path)
}

full_path <- function(file_path) {
    sprintf('%s://%s/%s/%s', protocol_name, working_storage, output_directory, file_path)
}

Parse and convert Carrier data to parquet


In [ ]:
carriers <- read.df(storage_path("carriers.csv"), "csv", header="true", inferSchema="true")
write.df(carriers, path=full_path("carriers"), source="parquet", mode="overwrite")
createOrReplaceTempView(carriers, "carriers")
head(carriers, 20)

Parse and convert to parquet Airport data


In [ ]:
airports <- read.df(storage_path("airports.csv"), "csv", header="true", inferSchema="true")
write.df(airports, path=full_path("airports"), source="parquet", mode="overwrite")
createOrReplaceTempView(airports, "airports")
head(airports, 20)

Parse and convert Flights data to parquet


In [ ]:
flights_w_na <- read.df(storage_path("2008.csv.bz2"), "csv", header="true", inferSchema="true")
flights <- fillna(flights_w_na, 0, cols=colnames(flights_w_na)[c(15, 16, 25:29)])
write.df(flights, path=full_path("flights"), source="parquet", mode="overwrite")
createOrReplaceTempView(flights, "flights")
colnames(flights)
head(flights_w_na, 5)[c("ArrDelay","CarrierDelay","WeatherDelay","Distance")]
head(flights, 5)[c("ArrDelay","CarrierDelay","WeatherDelay","Distance")]

In [ ]: