Convert CSV input file to parquet

  • Spark does not support out-of-the box data frame creation from CSV files.
  • Databricks' spark-csv provides such functionality but requires an extra library.

The following code creates the data frame manually.


In [ ]:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

In [ ]:
val dataPath = "../../data/expedia"
val dataName = "train500k"
val fileName = s"$dataPath/$dataName.csv.bz2"

val textRDD = sc.textFile(fileName)

Get header and exclude it from input file

Because all data is distributed we don't know which partition contains the first line. Hence, we need a filter to exclude it from further processing.


In [ ]:
val header = textRDD.first()
val noHeaderRDD = textRDD.filter(line => line != header)

Define data schema

In order to handle NA values correctly define all columns as String. We will later convert each row into the appropriate type.


In [ ]:
val fields = header.split(',').map(StructField(_, StringType, true))
val schema = StructType(fields)
schema.foreach(println)

Create DataFrame


In [ ]:
val rowRDD = noHeaderRDD.map(_.split(',')).map(Row.fromSeq(_))
val df = sqlContext.createDataFrame(rowRDD, schema)

Set correct column types


In [ ]:
val df2 = df.select(
    df("date_time").cast(TimestampType),
    df("site_name").cast(IntegerType),
    df("posa_continent").cast(IntegerType),
    df("user_location_country").cast(IntegerType),
    df("user_location_region").cast(IntegerType),
    df("user_location_city").cast(IntegerType),
    df("orig_destination_distance").cast(DoubleType),
    df("user_id").cast(IntegerType),
    df("is_mobile").cast(IntegerType),
    df("is_package").cast(IntegerType),
    df("channel").cast(IntegerType),
    df("srch_ci").cast(DateType),
    df("srch_co").cast(DateType),
    df("srch_adults_cnt").cast(IntegerType),
    df("srch_children_cnt").cast(IntegerType),
    df("srch_rm_cnt").cast(IntegerType),
    df("srch_destination_id").cast(IntegerType),
    df("srch_destination_type_id").cast(IntegerType),
    df("is_booking").cast(IntegerType),
    df("cnt").cast(IntegerType),
    df("hotel_continent").cast(IntegerType),
    df("hotel_country").cast(IntegerType),
    df("hotel_market").cast(IntegerType),
    df("hotel_cluster").cast(IntegerType)
    )
df2.printSchema()

In [ ]:
df2.coalesce(1).write.save(s"$dataPath/$dataName.parquet")