We'll talk a little bit about the Spark's precursor, Hadoop, and then we'll discuss the advantages and utility of Spark on top of Hadoop.
Next, we'll discuss what Dash offers in conjunction with Spark.
Finally, we will implement a simple Spark application using the PySpark API.
Mapper
class (user defined class extends Mapper
) which implements a map
functionmap
function takes in a pair, <k1,v1>
, and maps it to a new value, <k2,v2>
<k,v>=<w,c>
pairs<k,v>
pair<k,v>=<w,1>
pair <w,1>
pairs for the chunkreduce
function <w,c>
pairs by summing all v with the same k public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
1. Every application has to be pidgeon-holed into the MAP/REDUCE paradigm
2. Studies showed that up to 90% of computation wall time was spent in file I/O
3. Iterative algorithms were especially slow when implemented with MAP/REDUCE
* The application may bottle neck due to a low number of particularly slow nodes
Donated to the Apache Software Foundation in 2013
## RDD - Resilient Distributed Dataset
The main data structure for Spark applications
This piece of the presentation borrows heavily from a July 2017 DataCamp tutorial on machine learning and PySpark. Visit the following URL to see the original tutorial:
https://www.datacamp.com/community/tutorials/apache-spark-tutorial-machine-learning#gs.Y3MIPIY
We're going to explore some data and run some iterative algorithms on it. You can find the data here: http://www.dcc.fc.up.pt/~ltorgo/Regression/cal_housing.html
Download the cal_housing.tgz
tar ball at the bottom of the page and extract it somewhere obvious.
First we need to use the findspark package inorder to be able to locate the various Spark packages that we need (i.e. - pyspark, etc.) You can download and install findspark using pip as I did, or I'm sure anaconda will also work. There are probably other ways of making sure you can locate the packages you need as well, but this was the simplest and most straight forward I found.
In [1]:
import findspark
findspark.init()
NOTE: This is only necessary in the Jupyter Notebook. You should be able to import the necessary packages in a regular Python script without using findspark
The SparkSession is the entry point for any Spark application.
In [ ]:
from pyspark.sql import SparkSession
Let's create a new SparkSession through the builder attribute and the getOrCreate() method.
In [24]:
spark = SparkSession.builder\
.master("local")\
.appName("LinearRegressionModel")\
.config("spark.executor.memory","1gb")\
.getOrCreate()
sc = spark.sparkContext
Initializing the master and appName attributes isn't actually important or critical in this introduction, nor is configuring the memory options for the executor. I've included here for the sake of thoroughness.
NOTE: If you find the Spark tutorial on the Spark documentation web page it includes the following line of code:
spark = SparkSession.builder().appName(appName).master(master).getOrCreate()
That does not work. You must pass a string as an argument to the appName() and master() methods.
Moving on...
From here we can create a couple of RDDs: one with the data and another with the domain information, the header
In [25]:
rdd = sc.textFile('data/CaliforniaHousing/cal_housing.data')
header = sc.textFile('data/CaliforniaHousing/cal_housing.domain')
Part of what allows for some of the speed-up in Spark applications is that Spark evaluations are mostly lazy evals. So executing the following line of code isn't very useful:
In [26]:
header
Out[26]:
Instead we have to take an action on the rdd, such as collect()
to materialize the data represented by the rdd abstraction
In [27]:
header.collect()
Out[27]:
NOTE: collect()
is a pretty dangerous action: if the RDD is especially large then your executor you may run out of RAM and your application will crash. If you're using especially large data and you just want a peak at it to try to suss out its structure, then try take()
or first()
In [28]:
rdd.take(2)
Out[28]:
Since we read the data in with textFile()
we just have a set of strings separated by commas as our data. Let's split the data into separate entriees using the map()
function.
In [29]:
rdd = rdd.map(lambda line: line.split(","))
rdd.take(2)
Out[29]:
Now we have something more closely resembling a collection of records.
But, notice that the data does not have a header and is mostly unstructured.
We can fix that by converting the data to a DataFrame.
I have been told that n general DataFrames perform better than the RDDs, especially when using Python...
In [15]:
from pyspark.sql import Row
df = rdd.map(lambda line: Row(longitude=line[0],
latitude=line[1],
housingMedianAge=line[2],
totalRooms=line[3],
totalBedRooms=line[4],
population=line[5],
households=line[6],
medianIncome=line[7],
medianHouseValue=line[8])).toDF()
df.show()
To examine the data types associated with the dataframe printSchemea()
method.
In [16]:
df.printSchema()
Since we used the textFile()
method to read our data in, the data types are all string
The following would cast all of the columns to floats instead:
In [ ]:
from pyspark.sql.types import *
df = df.withColumn("longitude", df["longitude"].cast(FloatType())) \
.withColumn("latitude", df["latitude"].cast(FloatType())) \
.withColumn("housingMedianAge",df["housingMedianAge"].cast(FloatType())) \
.withColumn("totalRooms", df["totalRooms"].cast(FloatType())) \
.withColumn("totalBedRooms", df["totalBedRooms"].cast(FloatType())) \
.withColumn("population", df["population"].cast(FloatType())) \
.withColumn("households", df["households"].cast(FloatType())) \
.withColumn("medianIncome", df["medianIncome"].cast(FloatType())) \
.withColumn("medianHouseValue", df["medianHouseValue"].cast(FloatType()))
But that seems pretty inefficient, and it is. We can write a function to handle all of this for us:
In [18]:
from pyspark.sql.types import *
def convertCols(df,names,dataType): #df - a dataframe, names - a list of col names, dataType - the cast conversion type
for name in names:
df = df.withColumn(name,df[name].cast(dataType))
return df
names = ['households', 'housingMedianAge', 'latitude', 'longitude', 'medianHouseValue', 'medianIncome',\
'population', 'totalBedRooms', 'totalRooms']
df = convertCols(df,names,FloatType())
df.printSchema()
In [19]:
df.show(10)
The pyspark.sql
package has lots of convenient data exploration methods built in that support SQL query language execution. For example, we can select by columns:
In [20]:
df.select('population','totalBedrooms').show(10)
We can use the filter()
method to perform a classic SELECT FROM WHERE
query as below:
In [21]:
ndf = df.select('population','totalBedrooms').filter(df['totalBedrooms'] > 500)
ndf.show(10)
And we can get summary statistics pretty easilly too...
In [160]:
df.describe().show()
Let's do a quick bit of feature engineering and transformation to optimize a linear regression on our feature set...
In [161]:
# Import all from `sql.functions`
from pyspark.sql.functions import *
df.show()
# Adjust the values of `medianHouseValue`
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)
df.show()
We can examine the column of medianHouseValue in the above outputs to make sure that we transformed the data correctly.
Let's do some more feature engineering and standardization.
In [162]:
# Import all from `sql.functions` if you haven't yet
from pyspark.sql.functions import *
# Divide `totalRooms` by `households`
roomsPerHousehold = df.select(col("totalRooms")/col("households"))
# Divide `population` by `households`
populationPerHousehold = df.select(col("population")/col("households"))
# Divide `totalBedRooms` by `totalRooms`
bedroomsPerRoom = df.select(col("totalBedRooms")/col("totalRooms"))
# Add the new columns to `df`
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
.withColumn("populationPerHousehold", col("population")/col("households")) \
.withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))
# Inspect the result
df.first()
Out[162]:
Notice that we're using the col()
function to specify that we're using columnar data in our calculations. The col("totalRooms")/col("households")
is acting like a numpy array, element wise dividing the results.
Next we'll use the select()
method to reorder the data so that our response variable is
In [164]:
# Re-order and select columns
df = df.select("medianHouseValue",
"totalBedRooms",
"population",
"households",
"medianIncome",
"roomsPerHousehold",
"populationPerHousehold",
"bedroomsPerRoom")
Now we're going to actually isolate the response variable of labels from the predictor variables using a DenseVector, which is essentially a numpy
ndarray
.
In [165]:
# Import `DenseVector`
from pyspark.ml.linalg import DenseVector
# Define the `input_data`
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))
# Replace `df` with the new DataFrame
df = spark.createDataFrame(input_data, ["label", "features"])
There are all kinds of great machine learning algorithms and functions already built into PySpark in the Spark ML library. If you're interested in more data pipelining, try visiting this page: https://spark.apache.org/docs/latest/ml-pipeline.html
In [166]:
# Import `StandardScaler`
from pyspark.ml.feature import StandardScaler
# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")
# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df)
# Transform the data in `df` with the scaler
scaled_df = scaler.transform(df)
# Inspect the result
scaled_df.take(2)
Out[166]:
We can divide the data into training and testing sets using the PySpark SQL randomSplit()
method.
In [171]:
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234)
Now we can create the regression model. The original tutorial directs you to the following URL for information on the linear regression model class:
In [169]:
# Import `LinearRegression`
from pyspark.ml.regression import LinearRegression
# Initialize `lr`
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)
# Fit the data to the model
linearModel = lr.fit(train_data)
In [170]:
# Generate predictions
predicted = linearModel.transform(test_data)
# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])
# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()
# Print out first 5 instances of `predictionAndLabel`
predictionAndLabel[:5]
Out[170]:
To evaluate the model, we can inspect the model parameters.
In [184]:
# Coefficients for the model
linearModel.coefficients
Out[184]:
In [185]:
# Intercept for the model
linearModel.intercept
Out[185]:
And summary data for the model is available as well.
In [182]:
# Get the RMSE
linearModel.summary.rootMeanSquaredError
Out[182]:
In [183]:
# Get the R2
linearModel.summary.r2
Out[183]:
Stop the Spark session...
In [233]:
spark.stop()
DASK was the acronym used to name the first computer ever built in Denmark in the 1950s
NOTE:Not the first computer, just the first computer in Denmark
"Dask is a parallel programming library that combines with the Numeric Python ecosystem to provide parallel data frames, arrays, machine learning, and custom algorithms." - Dask Documentation
If you are already working with Big Data hardware and using developing in Scala / Java and you want one software package to rule them all, Spark is your guy.
If, on the other hand you are developing in Python using various Numeric Python libraries and you wish to add parallelism to already existing projects, the lightweight Dask may be more appropriate.
@delayed
annotation on user defined functions or the dask.delayed
function available in the library. compute()
functionmap
,filter
,groupby
, etc. take()
function to peak at the data you create with the above methods without using the compute()
methodBag.groupby
is particularly slow, and its replacement, Bag.foldby
, is confusingnp.where()