Introduction to PySpark

We'll talk a little bit about the Spark's precursor, Hadoop, and then we'll discuss the advantages and utility of Spark on top of Hadoop.

Next, we'll discuss what Dash offers in conjunction with Spark.

Finally, we will implement a simple Spark application using the PySpark API.

In the Beginning, There was Hadoop...

Apache Hadoop - an open source software framework for reliable,scalable, distributed computing on commodity hardware

  1. Main modules:
    • Hadoop Common - main Hadoop Utilities
    • HDFS - Hadoop Distributed File System
    • Hadoop YARN - a task scheduling / cluster resource management module
    • Hadoop MAP/REDUCE - paralell computing paradigm for big data anaylytics
  2. Emphasis on reliability and scalability
    • Pervasive Assumption: node failure is the rule, not the exception
    • Task / Cluster management is performed automatically, under the hood
  3. YARN is rack aware
    • Tasks are scheduled either on a node where the data is already housed, or preference is given to a node on the same rack
    • Reduces overall traffic between racks, increasing throughput

Big Data Analytics with Hadoop

  1. Store the data using the HDFS
  2. Define a Mapper class (user defined class extends Mapper) which implements a map function
    • The map function takes in a pair, <k1,v1> , and maps it to a new value, <k2,v2>
    • The prototypical example is a word count program : map a document to a set of <k,v>=<w,c> pairs
      • The document is distributed as chunks throughout the file system
        • Each chunk of the document becomes a value, v, associated with a key, k, in a <k,v> pair
      • Split each value into tokens (i.e. - words) associated with an iterator, iter
      • For every word in iter, w, create a <k,v>=<w,1> pair
      • Output a multi-set of <w,1> pairs for the chunk
        • We now have one multiset for each chunk of the document
      • Collect the multisets into a single multiset
  3. Define a Reducer class which impelements a reduce function
    • The reduce function reduces the output from the mapping to a more meaningful state
      • Reduce the multiset to a set of <w,c> pairs by summing all v with the same k

A Map/Reduce Example Implementation:

public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

     private final static IntWritable one = new IntWritable(1);
     private Text word = new Text();

     public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
         StringTokenizer itr = new StringTokenizer(value.toString());
         while (itr.hasMoreTokens()) {
             word.set(itr.nextToken());
             context.write(word, one);
         }
     }
}

public static class IntSumReducer
     extends Reducer<Text,IntWritable,Text,IntWritable> {
     private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context
                ) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
        sum += val.get();
        }
    result.set(sum);
    context.write(key, result);
    }
}

And Lo! the villagers did rejoice!

...for a time...

Some Problems with Hadoop MAP/REDUCE

1. Every application has to be pidgeon-holed into the MAP/REDUCE paradigm
2. Studies showed that up to 90% of computation wall time was spent in file I/O
3. Iterative algorithms were especially slow when implemented with MAP/REDUCE
    * The application may bottle neck due to a low number of particularly slow nodes

Enter Spark

  • Originally developed by Matei Zaharia and researchers at UC Berkley's AMP Lab
  • Donated to the Apache Software Foundation in 2013

    ## RDD - Resilient Distributed Dataset

  • The main data structure for Spark applications

  • Fault tolerant multi-set of data partitions available to a computing cluster in shared memory
  • Requires cluster management and distributed file system
    • Supported cluster management includes Spark native, Hadoop YARN, and Apache Mesos
    • Several supported DFS:
      • HDFS
      • MAP-R FDS
      • Cassandra
      • ...
    • RDDs can be created by any storage source supported by Hadoop ### In memory processing and a more diverse API are its main benefits over MAP/REDUCE

Iterative Operations in Hadoop MAP/REDUCE:

versus

Iterative Operations in Spark :

Interactive Operations in Hadoop MAP/REDUCE:

versus

Interactive Operations in Spark:

PySpark : a Python API for programming spark applications

* Originally Spark was written in Scala and most Spark applications were written in either Scala or Java
* Eventually support was extended with APIs for Python and R 
* We are going to work with the Python API : PySpark

The SparkShell

Spark has an REPL interactive shell called the SparkShell

Let's create a simple Spark application...

This piece of the presentation borrows heavily from a July 2017 DataCamp tutorial on machine learning and PySpark. Visit the following URL to see the original tutorial:

https://www.datacamp.com/community/tutorials/apache-spark-tutorial-machine-learning#gs.Y3MIPIY

Data: California Realestate

We're going to explore some data and run some iterative algorithms on it. You can find the data here: http://www.dcc.fc.up.pt/~ltorgo/Regression/cal_housing.html

Download the cal_housing.tgz tar ball at the bottom of the page and extract it somewhere obvious.

First we need to use the findspark package inorder to be able to locate the various Spark packages that we need (i.e. - pyspark, etc.) You can download and install findspark using pip as I did, or I'm sure anaconda will also work. There are probably other ways of making sure you can locate the packages you need as well, but this was the simplest and most straight forward I found.


In [1]:
import findspark 
findspark.init()

NOTE: This is only necessary in the Jupyter Notebook. You should be able to import the necessary packages in a regular Python script without using findspark

The SparkSession is the entry point for any Spark application.


In [ ]:
from pyspark.sql import SparkSession

Let's create a new SparkSession through the builder attribute and the getOrCreate() method.


In [24]:
spark = SparkSession.builder\
        .master("local")\
        .appName("LinearRegressionModel")\
        .config("spark.executor.memory","1gb")\
        .getOrCreate()
        
sc = spark.sparkContext

Initializing the master and appName attributes isn't actually important or critical in this introduction, nor is configuring the memory options for the executor. I've included here for the sake of thoroughness.

NOTE: If you find the Spark tutorial on the Spark documentation web page it includes the following line of code:

spark = SparkSession.builder().appName(appName).master(master).getOrCreate()

That does not work. You must pass a string as an argument to the appName() and master() methods.

Moving on...

From here we can create a couple of RDDs: one with the data and another with the domain information, the header


In [25]:
rdd = sc.textFile('data/CaliforniaHousing/cal_housing.data')
header = sc.textFile('data/CaliforniaHousing/cal_housing.domain')

Part of what allows for some of the speed-up in Spark applications is that Spark evaluations are mostly lazy evals. So executing the following line of code isn't very useful:


In [26]:
header


Out[26]:
data/CaliforniaHousing/cal_housing.domain MapPartitionsRDD[33] at textFile at NativeMethodAccessorImpl.java:0

Instead we have to take an action on the rdd, such as collect() to materialize the data represented by the rdd abstraction


In [27]:
header.collect()


Out[27]:
['longitude: continuous.',
 'latitude: continuous.',
 'housingMedianAge: continuous. ',
 'totalRooms: continuous. ',
 'totalBedrooms: continuous. ',
 'population: continuous. ',
 'households: continuous. ',
 'medianIncome: continuous. ',
 'medianHouseValue: continuous. ']

NOTE: collect() is a pretty dangerous action: if the RDD is especially large then your executor you may run out of RAM and your application will crash. If you're using especially large data and you just want a peak at it to try to suss out its structure, then try take() or first()


In [28]:
rdd.take(2)


Out[28]:
['-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000',
 '-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000']

Since we read the data in with textFile() we just have a set of strings separated by commas as our data. Let's split the data into separate entriees using the map() function.


In [29]:
rdd = rdd.map(lambda line: line.split(","))
rdd.take(2)


Out[29]:
[['-122.230000',
  '37.880000',
  '41.000000',
  '880.000000',
  '129.000000',
  '322.000000',
  '126.000000',
  '8.325200',
  '452600.000000'],
 ['-122.220000',
  '37.860000',
  '21.000000',
  '7099.000000',
  '1106.000000',
  '2401.000000',
  '1138.000000',
  '8.301400',
  '358500.000000']]

Now we have something more closely resembling a collection of records.

But, notice that the data does not have a header and is mostly unstructured.

We can fix that by converting the data to a DataFrame.

I have been told that n general DataFrames perform better than the RDDs, especially when using Python...


In [15]:
from pyspark.sql import Row

df = rdd.map(lambda line: Row(longitude=line[0], 
                              latitude=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()
df.show()


+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| households|housingMedianAge| latitude|  longitude|medianHouseValue|medianIncome| population|totalBedRooms| totalRooms|
+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| 126.000000|       41.000000|37.880000|-122.230000|   452600.000000|    8.325200| 322.000000|   129.000000| 880.000000|
|1138.000000|       21.000000|37.860000|-122.220000|   358500.000000|    8.301400|2401.000000|  1106.000000|7099.000000|
| 177.000000|       52.000000|37.850000|-122.240000|   352100.000000|    7.257400| 496.000000|   190.000000|1467.000000|
| 219.000000|       52.000000|37.850000|-122.250000|   341300.000000|    5.643100| 558.000000|   235.000000|1274.000000|
| 259.000000|       52.000000|37.850000|-122.250000|   342200.000000|    3.846200| 565.000000|   280.000000|1627.000000|
| 193.000000|       52.000000|37.850000|-122.250000|   269700.000000|    4.036800| 413.000000|   213.000000| 919.000000|
| 514.000000|       52.000000|37.840000|-122.250000|   299200.000000|    3.659100|1094.000000|   489.000000|2535.000000|
| 647.000000|       52.000000|37.840000|-122.250000|   241400.000000|    3.120000|1157.000000|   687.000000|3104.000000|
| 595.000000|       42.000000|37.840000|-122.260000|   226700.000000|    2.080400|1206.000000|   665.000000|2555.000000|
| 714.000000|       52.000000|37.840000|-122.250000|   261100.000000|    3.691200|1551.000000|   707.000000|3549.000000|
| 402.000000|       52.000000|37.850000|-122.260000|   281500.000000|    3.203100| 910.000000|   434.000000|2202.000000|
| 734.000000|       52.000000|37.850000|-122.260000|   241800.000000|    3.270500|1504.000000|   752.000000|3503.000000|
| 468.000000|       52.000000|37.850000|-122.260000|   213500.000000|    3.075000|1098.000000|   474.000000|2491.000000|
| 174.000000|       52.000000|37.840000|-122.260000|   191300.000000|    2.673600| 345.000000|   191.000000| 696.000000|
| 620.000000|       52.000000|37.850000|-122.260000|   159200.000000|    1.916700|1212.000000|   626.000000|2643.000000|
| 264.000000|       50.000000|37.850000|-122.260000|   140000.000000|    2.125000| 697.000000|   283.000000|1120.000000|
| 331.000000|       52.000000|37.850000|-122.270000|   152500.000000|    2.775000| 793.000000|   347.000000|1966.000000|
| 303.000000|       52.000000|37.850000|-122.270000|   155500.000000|    2.120200| 648.000000|   293.000000|1228.000000|
| 419.000000|       50.000000|37.840000|-122.260000|   158700.000000|    1.991100| 990.000000|   455.000000|2239.000000|
| 275.000000|       52.000000|37.840000|-122.270000|   162900.000000|    2.603300| 690.000000|   298.000000|1503.000000|
+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
only showing top 20 rows

To examine the data types associated with the dataframe printSchemea() method.


In [16]:
df.printSchema()


root
 |-- households: string (nullable = true)
 |-- housingMedianAge: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- medianHouseValue: string (nullable = true)
 |-- medianIncome: string (nullable = true)
 |-- population: string (nullable = true)
 |-- totalBedRooms: string (nullable = true)
 |-- totalRooms: string (nullable = true)

Since we used the textFile() method to read our data in, the data types are all string The following would cast all of the columns to floats instead:


In [ ]:
from pyspark.sql.types import *

df = df.withColumn("longitude", df["longitude"].cast(FloatType())) \
   .withColumn("latitude", df["latitude"].cast(FloatType())) \
   .withColumn("housingMedianAge",df["housingMedianAge"].cast(FloatType())) \
   .withColumn("totalRooms", df["totalRooms"].cast(FloatType())) \ 
   .withColumn("totalBedRooms", df["totalBedRooms"].cast(FloatType())) \ 
   .withColumn("population", df["population"].cast(FloatType())) \ 
   .withColumn("households", df["households"].cast(FloatType())) \ 
   .withColumn("medianIncome", df["medianIncome"].cast(FloatType())) \ 
   .withColumn("medianHouseValue", df["medianHouseValue"].cast(FloatType()))

But that seems pretty inefficient, and it is. We can write a function to handle all of this for us:


In [18]:
from pyspark.sql.types import *

def convertCols(df,names,dataType): #df - a dataframe, names - a list of col names, dataType - the cast conversion type
    for name in names:
        df = df.withColumn(name,df[name].cast(dataType))
    return df

names = ['households', 'housingMedianAge', 'latitude', 'longitude', 'medianHouseValue', 'medianIncome',\
         'population', 'totalBedRooms', 'totalRooms']


df = convertCols(df,names,FloatType())
df.printSchema()


root
 |-- households: float (nullable = true)
 |-- housingMedianAge: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- medianHouseValue: float (nullable = true)
 |-- medianIncome: float (nullable = true)
 |-- population: float (nullable = true)
 |-- totalBedRooms: float (nullable = true)
 |-- totalRooms: float (nullable = true)


In [19]:
df.show(10)


+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|households|housingMedianAge|latitude|longitude|medianHouseValue|medianIncome|population|totalBedRooms|totalRooms|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|     126.0|            41.0|   37.88|  -122.23|        452600.0|      8.3252|     322.0|        129.0|     880.0|
|    1138.0|            21.0|   37.86|  -122.22|        358500.0|      8.3014|    2401.0|       1106.0|    7099.0|
|     177.0|            52.0|   37.85|  -122.24|        352100.0|      7.2574|     496.0|        190.0|    1467.0|
|     219.0|            52.0|   37.85|  -122.25|        341300.0|      5.6431|     558.0|        235.0|    1274.0|
|     259.0|            52.0|   37.85|  -122.25|        342200.0|      3.8462|     565.0|        280.0|    1627.0|
|     193.0|            52.0|   37.85|  -122.25|        269700.0|      4.0368|     413.0|        213.0|     919.0|
|     514.0|            52.0|   37.84|  -122.25|        299200.0|      3.6591|    1094.0|        489.0|    2535.0|
|     647.0|            52.0|   37.84|  -122.25|        241400.0|        3.12|    1157.0|        687.0|    3104.0|
|     595.0|            42.0|   37.84|  -122.26|        226700.0|      2.0804|    1206.0|        665.0|    2555.0|
|     714.0|            52.0|   37.84|  -122.25|        261100.0|      3.6912|    1551.0|        707.0|    3549.0|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
only showing top 10 rows

The pyspark.sql package has lots of convenient data exploration methods built in that support SQL query language execution. For example, we can select by columns:


In [20]:
df.select('population','totalBedrooms').show(10)


+----------+-------------+
|population|totalBedrooms|
+----------+-------------+
|     322.0|        129.0|
|    2401.0|       1106.0|
|     496.0|        190.0|
|     558.0|        235.0|
|     565.0|        280.0|
|     413.0|        213.0|
|    1094.0|        489.0|
|    1157.0|        687.0|
|    1206.0|        665.0|
|    1551.0|        707.0|
+----------+-------------+
only showing top 10 rows

We can use the filter() method to perform a classic SELECT FROM WHERE query as below:


In [21]:
ndf = df.select('population','totalBedrooms').filter(df['totalBedrooms'] > 500)
ndf.show(10)


+----------+-------------+
|population|totalBedrooms|
+----------+-------------+
|    2401.0|       1106.0|
|    1157.0|        687.0|
|    1206.0|        665.0|
|    1551.0|        707.0|
|    1504.0|        752.0|
|    1212.0|        626.0|
|    1015.0|        541.0|
|    1258.0|        574.0|
|    1377.0|        715.0|
|    1959.0|        853.0|
+----------+-------------+
only showing top 10 rows

And we can get summary statistics pretty easilly too...


In [160]:
df.describe().show()


+-------+-----------------+------------------+-----------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|summary|       households|  housingMedianAge|         latitude|          longitude|  medianHouseValue|      medianIncome|        population|    totalBedRooms|        totalRooms|
+-------+-----------------+------------------+-----------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|  count|            20640|             20640|            20640|              20640|             20640|             20640|             20640|            20640|             20640|
|   mean|499.5396802325581|28.639486434108527|35.63186143109965|-119.56970444871473|206855.81690891474|3.8706710030346416|1425.4767441860465|537.8980135658915|2635.7630813953488|
| stddev|382.3297528316098| 12.58555761211163|2.135952380602968|  2.003531742932898|115395.61587441359|1.8998217183639696|  1132.46212176534| 421.247905943133|2181.6152515827944|
|    min|              1.0|               1.0|            32.54|            -124.35|           14999.0|            0.4999|               3.0|              1.0|               2.0|
|    max|           6082.0|              52.0|            41.95|            -114.31|          500001.0|           15.0001|           35682.0|           6445.0|           39320.0|
+-------+-----------------+------------------+-----------------+-------------------+------------------+------------------+------------------+-----------------+------------------+

Let's do a quick bit of feature engineering and transformation to optimize a linear regression on our feature set...


In [161]:
# Import all from `sql.functions` 
from pyspark.sql.functions import *

df.show()

# Adjust the values of `medianHouseValue`
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)

df.show()


+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|households|housingMedianAge|latitude|longitude|medianHouseValue|medianIncome|population|totalBedRooms|totalRooms|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|     126.0|            41.0|   37.88|  -122.23|        452600.0|      8.3252|     322.0|        129.0|     880.0|
|    1138.0|            21.0|   37.86|  -122.22|        358500.0|      8.3014|    2401.0|       1106.0|    7099.0|
|     177.0|            52.0|   37.85|  -122.24|        352100.0|      7.2574|     496.0|        190.0|    1467.0|
|     219.0|            52.0|   37.85|  -122.25|        341300.0|      5.6431|     558.0|        235.0|    1274.0|
|     259.0|            52.0|   37.85|  -122.25|        342200.0|      3.8462|     565.0|        280.0|    1627.0|
|     193.0|            52.0|   37.85|  -122.25|        269700.0|      4.0368|     413.0|        213.0|     919.0|
|     514.0|            52.0|   37.84|  -122.25|        299200.0|      3.6591|    1094.0|        489.0|    2535.0|
|     647.0|            52.0|   37.84|  -122.25|        241400.0|        3.12|    1157.0|        687.0|    3104.0|
|     595.0|            42.0|   37.84|  -122.26|        226700.0|      2.0804|    1206.0|        665.0|    2555.0|
|     714.0|            52.0|   37.84|  -122.25|        261100.0|      3.6912|    1551.0|        707.0|    3549.0|
|     402.0|            52.0|   37.85|  -122.26|        281500.0|      3.2031|     910.0|        434.0|    2202.0|
|     734.0|            52.0|   37.85|  -122.26|        241800.0|      3.2705|    1504.0|        752.0|    3503.0|
|     468.0|            52.0|   37.85|  -122.26|        213500.0|       3.075|    1098.0|        474.0|    2491.0|
|     174.0|            52.0|   37.84|  -122.26|        191300.0|      2.6736|     345.0|        191.0|     696.0|
|     620.0|            52.0|   37.85|  -122.26|        159200.0|      1.9167|    1212.0|        626.0|    2643.0|
|     264.0|            50.0|   37.85|  -122.26|        140000.0|       2.125|     697.0|        283.0|    1120.0|
|     331.0|            52.0|   37.85|  -122.27|        152500.0|       2.775|     793.0|        347.0|    1966.0|
|     303.0|            52.0|   37.85|  -122.27|        155500.0|      2.1202|     648.0|        293.0|    1228.0|
|     419.0|            50.0|   37.84|  -122.26|        158700.0|      1.9911|     990.0|        455.0|    2239.0|
|     275.0|            52.0|   37.84|  -122.27|        162900.0|      2.6033|     690.0|        298.0|    1503.0|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
only showing top 20 rows

+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|households|housingMedianAge|latitude|longitude|medianHouseValue|medianIncome|population|totalBedRooms|totalRooms|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
|     126.0|            41.0|   37.88|  -122.23|           4.526|      8.3252|     322.0|        129.0|     880.0|
|    1138.0|            21.0|   37.86|  -122.22|           3.585|      8.3014|    2401.0|       1106.0|    7099.0|
|     177.0|            52.0|   37.85|  -122.24|           3.521|      7.2574|     496.0|        190.0|    1467.0|
|     219.0|            52.0|   37.85|  -122.25|           3.413|      5.6431|     558.0|        235.0|    1274.0|
|     259.0|            52.0|   37.85|  -122.25|           3.422|      3.8462|     565.0|        280.0|    1627.0|
|     193.0|            52.0|   37.85|  -122.25|           2.697|      4.0368|     413.0|        213.0|     919.0|
|     514.0|            52.0|   37.84|  -122.25|           2.992|      3.6591|    1094.0|        489.0|    2535.0|
|     647.0|            52.0|   37.84|  -122.25|           2.414|        3.12|    1157.0|        687.0|    3104.0|
|     595.0|            42.0|   37.84|  -122.26|           2.267|      2.0804|    1206.0|        665.0|    2555.0|
|     714.0|            52.0|   37.84|  -122.25|           2.611|      3.6912|    1551.0|        707.0|    3549.0|
|     402.0|            52.0|   37.85|  -122.26|           2.815|      3.2031|     910.0|        434.0|    2202.0|
|     734.0|            52.0|   37.85|  -122.26|           2.418|      3.2705|    1504.0|        752.0|    3503.0|
|     468.0|            52.0|   37.85|  -122.26|           2.135|       3.075|    1098.0|        474.0|    2491.0|
|     174.0|            52.0|   37.84|  -122.26|           1.913|      2.6736|     345.0|        191.0|     696.0|
|     620.0|            52.0|   37.85|  -122.26|           1.592|      1.9167|    1212.0|        626.0|    2643.0|
|     264.0|            50.0|   37.85|  -122.26|             1.4|       2.125|     697.0|        283.0|    1120.0|
|     331.0|            52.0|   37.85|  -122.27|           1.525|       2.775|     793.0|        347.0|    1966.0|
|     303.0|            52.0|   37.85|  -122.27|           1.555|      2.1202|     648.0|        293.0|    1228.0|
|     419.0|            50.0|   37.84|  -122.26|           1.587|      1.9911|     990.0|        455.0|    2239.0|
|     275.0|            52.0|   37.84|  -122.27|           1.629|      2.6033|     690.0|        298.0|    1503.0|
+----------+----------------+--------+---------+----------------+------------+----------+-------------+----------+
only showing top 20 rows

We can examine the column of medianHouseValue in the above outputs to make sure that we transformed the data correctly.

Let's do some more feature engineering and standardization.


In [162]:
# Import all from `sql.functions` if you haven't yet
from pyspark.sql.functions import *

# Divide `totalRooms` by `households`
roomsPerHousehold = df.select(col("totalRooms")/col("households"))

# Divide `population` by `households`
populationPerHousehold = df.select(col("population")/col("households"))

# Divide `totalBedRooms` by `totalRooms`
bedroomsPerRoom = df.select(col("totalBedRooms")/col("totalRooms"))

# Add the new columns to `df`
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
   .withColumn("populationPerHousehold", col("population")/col("households")) \
   .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))
   
# Inspect the result
df.first()


Out[162]:
Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0, roomsPerHousehold=6.984126984126984, populationPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908)

Notice that we're using the col() function to specify that we're using columnar data in our calculations. The col("totalRooms")/col("households") is acting like a numpy array, element wise dividing the results.

Next we'll use the select() method to reorder the data so that our response variable is


In [164]:
# Re-order and select columns
df = df.select("medianHouseValue", 
              "totalBedRooms", 
              "population", 
              "households", 
              "medianIncome", 
              "roomsPerHousehold", 
              "populationPerHousehold", 
              "bedroomsPerRoom")

Now we're going to actually isolate the response variable of labels from the predictor variables using a DenseVector, which is essentially a numpy ndarray.


In [165]:
# Import `DenseVector`
from pyspark.ml.linalg import DenseVector

# Define the `input_data` 
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Replace `df` with the new DataFrame
df = spark.createDataFrame(input_data, ["label", "features"])

There are all kinds of great machine learning algorithms and functions already built into PySpark in the Spark ML library. If you're interested in more data pipelining, try visiting this page: https://spark.apache.org/docs/latest/ml-pipeline.html


In [166]:
# Import `StandardScaler` 
from pyspark.ml.feature import StandardScaler

# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df)

# Transform the data in `df` with the scaler
scaled_df = scaler.transform(df)

# Inspect the result
scaled_df.take(2)


Out[166]:
[Row(label=4.526, features=DenseVector([129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466]), features_scaled=DenseVector([0.3062, 0.2843, 0.3296, 4.3821, 2.8228, 0.2461, 2.5264])),
 Row(label=3.585, features=DenseVector([1106.0, 2401.0, 1138.0, 8.3014, 6.2381, 2.1098, 0.1558]), features_scaled=DenseVector([2.6255, 2.1202, 2.9765, 4.3696, 2.5213, 0.2031, 2.6851]))]

We can divide the data into training and testing sets using the PySpark SQL randomSplit() method.


In [171]:
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234)

Now we can create the regression model. The original tutorial directs you to the following URL for information on the linear regression model class:

https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.regression.LinearRegression


In [169]:
# Import `LinearRegression`
from pyspark.ml.regression import LinearRegression

# Initialize `lr`
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the data to the model
linearModel = lr.fit(train_data)

In [170]:
# Generate predictions
predicted = linearModel.transform(test_data)

# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()

# Print out first 5 instances of `predictionAndLabel` 
predictionAndLabel[:5]


Out[170]:
[(1.1340115638008952, 0.14999),
 (1.4485018834650096, 0.14999),
 (1.5713396046425587, 0.14999),
 (1.7496542762527307, 0.283),
 (1.2438468929500472, 0.366)]

To evaluate the model, we can inspect the model parameters.


In [184]:
# Coefficients for the model
linearModel.coefficients


Out[184]:
DenseVector([0.0, 0.0, 0.0, 0.2796, 0.0, 0.0, 0.0])

In [185]:
# Intercept for the model
linearModel.intercept


Out[185]:
0.9841344205626824

And summary data for the model is available as well.


In [182]:
# Get the RMSE
linearModel.summary.rootMeanSquaredError


Out[182]:
0.8765335684459216

In [183]:
# Get the R2
linearModel.summary.r2


Out[183]:
0.42282227755911483

Stop the Spark session...


In [233]:
spark.stop()

On to Dask

DASK was the acronym used to name the first computer ever built in Denmark in the 1950s

NOTE:Not the first computer, just the first computer in Denmark

"Dask is a parallel programming library that combines with the Numeric Python ecosystem to provide parallel data frames, arrays, machine learning, and custom algorithms." - Dask Documentation

A few Differences from Spark

  1. Dask may be deployed on a cluster, but it is intended mostly for use on a single machine.
    • Design focus was on efficient memory usage as opposed to ensuring reliability in the distributed environment
  2. Dask is intended to be smaller and more lightweight than Spark.
    • Thus it offers less functionality, but is designed to play nice with other data science packages.
  3. Dask is written in Python, whereas Spark is written in Scala
    • Debugging computational code in Spark is complicated given the added layers of the Java JVM and serialization layers
    • Debugging computational code in Dask is just like Debugging any other Python computational code
  4. While Spark is built around RDDs, the main Dask data structure is a generic task graph with arbitrary data dependencies
    • The low level, generic construction offers greater flexibility for developing more general computational algorithms
    • Sited by Dask developers as the largest single difference between the two
  5. Task Graph Granularity
    • Spark task graphs are a high level view of an application, which is interpreted at execution time then dispatched to compute nodes and performed in the distributed environment as "many little tasks"
    • A Dask task graph is a low level view of the "many little tasks"
    • As such, Dask is not optimized in the same way as Spark, but provides more transparency to the developer

Essentially

If you are already working with Big Data hardware and using developing in Scala / Java and you want one software package to rule them all, Spark is your guy.

If, on the other hand you are developing in Python using various Numeric Python libraries and you wish to add parallelism to already existing projects, the lightweight Dask may be more appropriate.

Dask Essentials

Parallelism

  • Achieved primarilly through lazy function evaluation
  • Exposed to the programmer through the @delayed annotation on user defined functions or the dask.delayed function available in the library.
  • To evaluate the lazy values, use the compute() function

Dask Bag

  • Parallel list data structure for "messy data"
  • Data in the list can be mixed type, contain complex nested structures, missing values, etc.
  • Implement all of the typical functional API you're used to: map,filter,groupby, etc.
  • You can use the take() function to peak at the data you create with the above methods without using the compute() method
  • Bags provide very general computation - practically ANY python function
  • Limitations:
    1. Bags : DataFrames :: List : Numpy.ndarray i.e. - SLOW
    2. Bag.groupby is particularly slow, and its replacement, Bag.foldby, is confusing

Distributed Dask

  • I'm not focusing on this, refer to the Dask tutorial...

Dask Arrays

  • Essentially distributed Numpy
  • Employ parallel computation using all of your computer cores
  • Effectively stream larger than memory data from disk
  • Q: How long does it take to sum 1Billion Numbers?
    • [A]: around 5 seconds...
  • A large set of the Numpy API has been implemented
  • Limitations:
    • np.linalg is not yet implemented
    • fancy indexing and operations like np.where()
    • no sorting!

Dataframes

  • A blocked, parallel dataframe object which mimics the Pandas dataframe to a large degree
  • Actually constructed of several independent Pandas dataframes smooshed together
  • Distributed Dask allows you to use dataframes across a clutster
  • Whenever Pandas releases the Global Interpreter Lock (GIL) Dask can run several Pandas operations in parallel
    • SpeedUp on the order of the number of cores
  • Limitations:
    • Pandas is more mature and fully functional
      • Non-trivial bugs are frequently reported in Dask dataframes (quickly fixed... :/ )
      • If your data fits in memory, use Pandas
    • Only a small subset of Pandas is currently implemented