The following code creates the data frame manually.
In [ ]:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
In [ ]:
val dataPath = "../../data/expedia"
val dataName = "train500k"
val fileName = s"$dataPath/$dataName.csv.bz2"
val textRDD = sc.textFile(fileName)
In [ ]:
val header = textRDD.first()
val noHeaderRDD = textRDD.filter(line => line != header)
In [ ]:
val fields = header.split(',').map(StructField(_, StringType, true))
val schema = StructType(fields)
schema.foreach(println)
In [ ]:
val rowRDD = noHeaderRDD.map(_.split(',')).map(Row.fromSeq(_))
val df = sqlContext.createDataFrame(rowRDD, schema)
In [ ]:
val df2 = df.select(
df("date_time").cast(TimestampType),
df("site_name").cast(IntegerType),
df("posa_continent").cast(IntegerType),
df("user_location_country").cast(IntegerType),
df("user_location_region").cast(IntegerType),
df("user_location_city").cast(IntegerType),
df("orig_destination_distance").cast(DoubleType),
df("user_id").cast(IntegerType),
df("is_mobile").cast(IntegerType),
df("is_package").cast(IntegerType),
df("channel").cast(IntegerType),
df("srch_ci").cast(DateType),
df("srch_co").cast(DateType),
df("srch_adults_cnt").cast(IntegerType),
df("srch_children_cnt").cast(IntegerType),
df("srch_rm_cnt").cast(IntegerType),
df("srch_destination_id").cast(IntegerType),
df("srch_destination_type_id").cast(IntegerType),
df("is_booking").cast(IntegerType),
df("cnt").cast(IntegerType),
df("hotel_continent").cast(IntegerType),
df("hotel_country").cast(IntegerType),
df("hotel_market").cast(IntegerType),
df("hotel_cluster").cast(IntegerType)
)
df2.printSchema()
In [ ]:
df2.coalesce(1).write.save(s"$dataPath/$dataName.parquet")