Create a Parquet file

This notebook shows how to create a Parquet file out of a matrix stored as object file. Once converted to a Parquet file, the matrix can be loaded into other notebook kernels such as Python and R.

Dependencies


In [16]:
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}

Configuration


In [17]:
var dir_path = "hdfs:///user/hadoop/spring-index/"
var offline_dir_path = "hdfs:///user/pheno/spring-index/"
var geoTiff_dir = "BloomFinal"
val toBeMasked = true

var mask_str = ""
if (toBeMasked)
    mask_str = "_mask"

var grids_matrix_path = offline_dir_path + geoTiff_dir + "/grids_matrix" + mask_str
var grids_parquet_path = offline_dir_path + geoTiff_dir + "/grids_matrix" + mask_str + ".parquet"


dir_path = hdfs:///user/hadoop/spring-index/
offline_dir_path = hdfs:///user/pheno/spring-index/
geoTiff_dir = BloomFinal
toBeMasked = true
mask_str = _mask
grids_matrix_path = hdfs:///user/pheno/spring-index/BloomFinal/grids_matrix_mask
grids_parquet_path = hdfs:///user/pheno/spring-index/BloomFinal/grids_matrix_mask.parquet
Out[17]:
hdfs:///user/pheno/spring-index/BloomFinal/grids_matrix_mask.parquet

Read Matrix


In [18]:
var grids_matrix: RDD[Vector] = sc.emptyRDD
grids_matrix = sc.objectFile(grids_matrix_path)


grids_matrix = MapPartitionsRDD[21] at objectFile at <console>:63
grids_matrix = MapPartitionsRDD[21] at objectFile at <console>:63
Out[18]:
MapPartitionsRDD[21] at objectFile at <console>:63

Create DataFrame

The dataFram will have years as columns. Since it is a transposed matrix saved as RDD[Vector] each vector is a row.


In [21]:
//Define the columns names
val start_year = 1980
val end_year = 2015
var cols :Array[String] = new Array[String](end_year-start_year+1)
for (f <- start_year to end_year) {
    cols(f-start_year) = f.toString
}

val schema = new StructType(cols.map( m => StructField(m, DoubleType, nullable = true)))
val rowRDD = grids_matrix.map( m => Row.fromSeq(m.toArray))
var matrixDF = spark.createDataFrame(rowRDD, schema)


start_year = 1980
end_year = 2015
cols = Array(1980, 1981, 1982, 1983, 1984, 1985, 1986, 1987, 1988, 1989, 1990, 1991, 1992, 1993, 1994, 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015)
schema = StructType(StructField(1980,DoubleType,true), StructField(1981,DoubleType,true), StructField(1982,DoubleType,true), StructField(1983,DoubleType,true), StructField(1984,DoubleType,true), StructField(1985,DoubleType,true), StructField(1986,DoubleType,true), StructField(1987,DoubleType,true), StructField(1988,DoubleType,true), StructField(1989,DoubleType,true), StructField(1990,DoubleType,true), StructField(1991,DoubleType,true), StructField(1992,DoubleType,tr...
lastException: Throwable = null
Out[21]:
StructType(StructField(1980,DoubleType,true), StructField(1981,DoubleType,true), StructField(1982,DoubleType,true), StructField(1983,DoubleType,true), StructField(1984,DoubleType,true), StructField(1985,DoubleType,true), StructField(1986,DoubleType,true), StructField(1987,DoubleType,true), StructField(1988,DoubleType,true), StructField(1989,DoubleType,true), StructField(1990,DoubleType,true), StructField(1991,DoubleType,true), StructField(1992,DoubleType,true), StructField(1993,DoubleType,true), StructField(1994,DoubleType,true), StructField(1995,DoubleType,true), StructField(1996,DoubleType,true), StructField(1997,DoubleType,true), StructField(1998,DoubleType,true), StructField(1999,DoubleType,true), StructField(2000,DoubleType,true), StructField(2001,DoubleType,true), StructField(2002,DoubleType,true), StructField(2003,DoubleType,true), StructField(2004,DoubleType,true), StructField(2005,DoubleType,true), StructField(2006,DoubleType,true), StructField(2007,DoubleType,true), StructField(2008,DoubleType,true), StructField(2009,DoubleType,true), StructField(2010,DoubleType,true), StructField(2011,DoubleType,true), StructField(2012,DoubleType,true), StructField(2013,DoubleType,true), StructField(2014,DoubleType,true), StructField(2015,DoubleType,true))

Save Dataframe into a Parquet


In [23]:
matrixDF.write.parquet(grids_parquet_path)


[Stage 6:========================================================>(63 + 1) / 64]
lastException: Throwable = null

In [ ]: