MLlib is Spark’s machine learning (ML) library. Its goal is to make practical machine learning scalable and easy. At a high level, it provides tools such as:
You can find 2 Machine Libraries APIs in Spark:
* spark.mllib: it is the RDD-based API
* spark.ml: it is the DataFrame-based API
spark.ml is the primary ML library in Spark. spark.mllib is in maintenance mode. This means that it can be used and it will have bug fixes but will not have any new features.
As of Spark 2.0, the RDD-based APIs in the spark.mllib package have entered maintenance mode. The primary Machine Learning API for Spark is now the DataFrame-based API in the spark.ml package.
DataFrames provide a more user-friendly API than RDDs. The many benefits of DataFrames include Spark Datasources, SQL/DataFrame queries, Tungsten and Catalyst optimizations, and uniform APIs across languages.
In [1]:
from pyspark.sql import SparkSession
import pyspark
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
sc = spark.sparkContext
In [2]:
df = spark.read.csv(path = '../data/papers.csv', header = False,inferSchema = True)
In [4]:
df.printSchema()
In [6]:
df.show(5)
source: https://datahack.analyticsvidhya.com/contest/black-friday
The data set also contains customer demographics (age, gender, marital status, city_type, stay_in_current_city), product details (product_id and product category) and Total purchase_amount from last month.
Variable | Definition |
---|---|
User_ID | User ID |
Product_ID | Product ID |
Gender | Sex of User |
Age | Age in bins |
Occupation | Occupation (Masked) |
City_Category | Category of the City (A,B,C) |
Stay_In_Current_City_Years | Number of years stay in current city |
Marital_Status | Marital Status |
Product_Category_1 | Product Category (Masked) |
Product_Category_2 | Product may belongs to other category also (Masked) |
Product_Category_3 | Product may belongs to other category also (Masked) |
Purchase | Purchase Amount (Target Variable) |
In [7]:
bf_train = spark.read.csv(path = '../data/blackfriday_train.csv', header = True,inferSchema = True)
bf_test = spark.read.csv(path = '../data/blackfriday_test.csv', header = True,inferSchema = True)
In [8]:
bf_train.printSchema()
source: https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/
To see the types of columns in DataFrame, we can use the printSchema. printSchema() on a DataFrame will show the schema in a tree format.
In [9]:
df.printSchema()
In [10]:
df.head(5)
Out[10]:
To see the result formatted as DataFrames output, we can use the show operation.
We can pass the argument truncate = True to truncate the result (the row won't be shown completely).
Note that show does not return any data, just shows the DataFrame contents.
In [11]:
print(type(df.show(2,truncate= True)))
#df.show(2,truncate= True)
In [12]:
df.count()
Out[12]:
In [13]:
len(df.columns), df.columns
Out[13]:
describe operation is use to calculate the summary statistics of numerical column(s) in DataFrame. If we don’t specify the name of columns it will calculate summary statistics for all numerical columns present in DataFrame.
In [14]:
df.describe().show()
Note that describe()
is a transformation, so the result is a DataFrame that we can collect
or transform again.
In [16]:
df.describe()
Out[16]:
In [17]:
type(df.describe())
Out[17]:
As we can see that, describe operation is working for String type column but the output for mean, stddev are null and min & max values are calculated based on ASCII value of categories.
In [ ]:
df.describe("_c0").show()
In [18]:
unique_elements = df.select('_c0').distinct().collect()
len(unique_elements)
Out[18]:
In [ ]:
df.select('_c0').distinct().count()
In [23]:
# if we want to get the different number of conferences
df.select("_c3").distinct().show(5)
In [24]:
df2 = spark.read.csv(path = '../data/people.csv', header = True,inferSchema = True)
In [25]:
df2.crosstab('Age[years]', 'Sex').show()
In the above output, the first column of each row will be the distinct values of Age
and the column names will be the distinct values of Sex
. The name of the first column will be Age[years]_Sex
. Pair with no occurrences will have zero count in contingency table.
We can use dropDuplicates operation to drop the duplicate rows of a DataFrame and get the DataFrame which won’t have duplicate rows.
If we apply this on two columns Sex
and Eye Color
of df2 and get the all unique rows for these columns.
In [27]:
df2.select('Sex', 'Eye Color').show()
In [26]:
df2.select('Sex', 'Eye Color').dropDuplicates().show()
In [28]:
df2.select('Sex', 'Eye Color').count()
Out[28]:
In [29]:
df2.select('Sex', 'Eye Color').dropDuplicates().count()
Out[29]:
The dropna
operation can be use here. To drop row from the DataFrame it consider three options.
how
– any
or all
. If any
, drop a row if it contains any nulls. If all
, drop a row only if all its values are null.thresh
– int
, default None
If specified, drop rows that have less than thresh non-null values. This overwrites the how parameter.subset
– optional list of column names to consider.Let’t drop null rows in df2 with default parameters and count the rows in output DataFrame. Default options are any
, None
, None
for how, thresh
, subset
respectively.
In [30]:
bf_train.count()
Out[30]:
In [31]:
bf_train.dropna().count()
Out[31]:
Use fillna operation here. The fillna will take two parameters to fill the null values.
value
:subset
: Specify some selected columns.Let’s fill -1
inplace of null values in train DataFrame.
In [ ]:
bf_train.show(2)
In [ ]:
bf_train.fillna(-1).show(2)
We can apply the filter operation on Purchase column in bf_train
DataFrame to filter out the rows with values more than 15000.
We need to pass a condition.
Let’s apply filter on Purchase
column in train DataFrame and print the number of rows which has more purchase than 15000.
In [ ]:
bf_train.filter(bf_train.Purchase > 15000).count()
In [ ]:
bf_train.groupby('Age').agg({'Purchase': 'mean'}).show()
We can also apply sum
, min
, max
, count
with groupby
when we want to get different summary insight each group.
Let’s take one more example of groupby
to count the number of rows in each Age
group.
In [ ]:
bf_train.groupby('Age').count().show()
We can use sample operation to take sample of a DataFrame.
The sample
method on DataFrame will return a DataFrame containing the sample of base DataFrame. The sample method will take 3 parameters.
withReplacement = True
or False
to select a observation with or without replacement.fraction = x
, where x = .5
shows that we want to have 50% data in sample DataFrame.seed
to reproduce the resultLet’s create the two DataFrame t1
and t2
from bf_train
, both will have 20% sample of train and count the number of rows in each.
In [ ]:
t1 = bf_train.sample(False, 0.2, 42)
t2 = bf_train.sample(False, 0.2, 43)
t1.count(),t2.count()
We can apply a function on each row of DataFrame using map
operation. After applying this function, we get the result in the form of RDD
. Let’s apply a map
operation on User_ID
column of bf_train
and print the first 5 elements of mapped RDD(x,1)
after applying the function.
In [ ]:
bf_train.select('User_ID').rdd.map(lambda x:(x,1)).take(5)
We can use orderBy
operation on DataFrame to get sorted output based on some column. The orderBy
operation take two arguments.
Let’s sort the train DataFrame based on Purchase
.
In [ ]:
bf_train.orderBy(bf_train.Purchase.desc()).show(5)
We can use withColumn
operation to add new column (we can also replace) in base DataFrame and return a new DataFrame. The withColumn
operation will take 2 parameters.
Let’s see how withColumn
works. To calculate new column name Purchase_new
in bf_train
which is calculated by dviding Purchase
column by 2.
In [ ]:
bf_train.withColumn('Purchase_new', bf_train.Purchase /2.0).select('Purchase','Purchase_new').show(5)
We can also use functions with withColumn
In [34]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
to_cat = udf(lambda x: "cheap" if x > 15000 else "expensive", StringType())
bf_train.withColumn('Purchase_cat', to_cat(bf_train["Purchase"])).select('Purchase_cat').show(5)
In [35]:
bf_test.drop('Comb').columns
Out[35]:
In [36]:
train, test = df.randomSplit([0.9, 0.1], seed=12345)
In [37]:
train.count()
Out[37]:
In [38]:
test.count()
Out[38]:
A vector is a one-dimensional array of elements.
The natural Python implementation of a vector is as a one-dimensional list. However, in many applications, the elements of a vector have mostly zero values. Such a vector is said to be sparse.
It is inefficient to use a one-dimensional list to store a sparse vector. It is also inefficient to add elements whose values are zero in forming sums of sparse vectors. Consequently, we should choose a different representation.
A sparse vector is represented by two parallel arrays: indices and values. Zero entries are not stored. A dense vector is backed by a double array representing its entries. For example, a vector [1., 0., 0., 0., 0., 0., 0., 3.] can be represented in the sparse format as (7, [0, 6], [1., 3.]), where 7 is the size of the vector, as illustrated below:
In [39]:
from pyspark.ml.linalg import SparseVector, DenseVector, Matrices
In [40]:
sv1 = SparseVector(3, [0, 2], [1.0, 3.0])
sv1
Out[40]:
In [41]:
dv1 = DenseVector([1.0, 3.0])
dv1
Out[41]:
We can also have Sparse and Dense Matrices
In [ ]:
Matrices.dense(2,2,[1,2,3,4])
In [ ]:
sparse_mat = Matrices.sparse(2,2,[1,2,3],[0,1],[1,1])
In [ ]:
dense_mat = Matrices.dense(2,2,[1,2,3,4])
It is common to, instead of represent features as variables, to represent all instance variables as a vector, which indeed can be sparse.
In [ ]:
sparse_df = sc.parallelize([
(1, SparseVector(10, {1: 1.0, 2: 1.0, 3: 2.0, 4: 1.0, 5: 3.0})),
(2, SparseVector(10, {9: 100.0})),
(3, SparseVector(10, {1: 1.0})),
]).toDF(["row_num", "features"])
sparse_df.show()
In [ ]:
dense_df = sc.parallelize([
(1, DenseVector([1,2,3,4])),
(2, DenseVector([1,2,3,4])),
(3, DenseVector([1,3,4,5])),
]).toDF(["row_num", "features"])
dense_df.show()
from: https://databricks.com/blog/2014/07/16/new-features-in-mllib-in-spark-1-0.html
For many large-scale datasets, it is not feasible to store the data in a dense format. Nevertheless, for medium-sized data, it is natural to ask when we should switch from a dense format to sparse. In MLlib, a sparse vector requires 12nnz+4 bytes of storage, where nnz is the number of nonzeros, while a dense vector needs 8n bytes, where n is the vector size. So storage-wise, the sparse format is better than the dense format when more than 1/3 of the elements are zero. However, assuming that the data can be fit into memory in both formats, we usually need sparser data to observe a speedup, because the sparse format is not as efficient as the dense format in computation. Our experience suggests a sparsity of around 10%, while the exact switching point for the running time is indeed problem-dependent.
from: https://spark.apache.org/docs/2.3.2/ml-pipeline.html
In this section, we introduce and practice with the concept of ML Pipelines.
ML Pipelines provide a uniform set of high-level APIs built on top of DataFrames that help users create and tune practical machine learning pipelines.
MLlib standardizes APIs for machine learning algorithms to make it easier to combine multiple algorithms into a single pipeline, or workflow. This section covers the key concepts introduced by the Pipelines API, where the pipeline concept is mostly inspired by the scikit-learn project.
DataFrame: This ML API uses DataFrame from Spark SQL as an ML dataset, which can hold a variety of data types. E.g., a DataFrame could have different columns storing text, feature vectors, true labels, and predictions.
Transformer: A Transformer is an algorithm which can transform one DataFrame into another DataFrame. E.g., an ML model is a Transformer which transforms a DataFrame with features into a DataFrame with predictions.
Estimator: An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model.
Pipeline: A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.
A Transformer is an abstraction that includes feature transformers and learned models.
Technically, a Transformer implements a method transform()
, which converts one DataFrame into another, generally by appending one or more columns.
For example:
A feature transformer might take a DataFrame, read a column (e.g., text), map it into a new column (e.g., feature vectors), and output a new DataFrame with the mapped column appended.
A learning model might take a DataFrame, read the column containing feature vectors, predict the label for each feature vector, and output a new DataFrame with predicted labels appended as a column.
An Estimator abstracts the concept of a learning algorithm or any algorithm that fits or trains on data. Technically, an Estimator implements a method fit()
, which accepts a DataFrame and produces a Model, which is a Transformer.
For example, a learning algorithm such as LogisticRegression is an Estimator, and calling fit()
trains a LogisticRegressionModel, which is a Model and hence a Transformer.
We will see two examples of how to train Supervised Machine Learning models. Supervised Models try learn from labeled datasets. This means that we have a dataset with some variables for each ocurrence and a label of that occurrence.
The main objective is to predict a label for a new occurrence for which we don't have the label.
In the following example we will load the auto-mpg.csv
dataset. The description says (http://archive.ics.uci.edu/ml/datasets/Auto+MPG)
"The data concerns city-cycle fuel consumption in miles per gallon, to be predicted in terms of 3 multivalued discrete and 5 continuous attributes." (Quinlan, 1993)
So, first of all, let's load the dataset:
In [42]:
auto_df = spark.read.csv(path = '../data/auto-mpg.csv',
header = True,
inferSchema = True)
In [43]:
auto_df.printSchema()
Our main goal is to build a predictive model that takes some input variables representing the vehicle features, and outputs the consumption of the vehicle in miles per gallon.
In [44]:
pred_vars = ['cylinders', 'displacement', 'weight', 'acceleration', 'year', 'origin']
To predict the consumption we will use a Linear Regression model. We won't go into details about the model itself, but we have to take into account the following considerations:
Vector
representing the vehicle characteristicsTo generate the training dataset we will use a VectorAssembler, which is a transformer.
VectorAssembler, takes a DataFrame as input, and outputs the same DataFrame with the specified columns in a vector.
In [45]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(
inputCols = pred_vars,
outputCol = 'features')
train_df = vectorAssembler.transform(auto_df)
train_df = train_df.withColumn("label", auto_df["mpg"])
train_df = train_df.select(['features', 'label'])
train_df.show(3)
Then we create a LinearRegression Estimator (skip parameters details). Remember that lrModel
is a Transformer.
In [47]:
from pyspark.ml.regression import LinearRegression
# LinearRegression is an Estimator
lr = LinearRegression(maxIter=10,
regParam=0.3,
elasticNetParam=0.8)
# Fit the model
lrModel = lr.fit(train_df)
# lrModel will contain a Transformer
type(lrModel)
Out[47]:
To make the predictions over the dataset, we just have to apply the transformer over the features of a certain dataset.
In [48]:
predictions = lrModel.transform(train_df.select(['features']))
predictions.show(5)
In [ ]:
df = spark.createDataFrame([
(0,"the cat in the mat is flat"),
(1,"the mouse with the hat is nice")
], ["id","text"])
tokenizer = Tokenizer(inputCol="text", outputCol="words")
tok_df = tokenizer.transform(df)
tok_df.select("words").collect()
In [ ]:
from pyspark.ml.feature import CountVectorizer
count_vec = CountVectorizer(inputCol="words", outputCol="features")
counter = count_vec.fit(tok_df)
count_df = counter.transform(tok_df)
count_df.show()
In [ ]:
counter.vocabulary
In [ ]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = CountVectorizer(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
# Fit the pipeline to training documents.
model = pipeline.fit(training)
# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
(4, "spark i j k"),
(5, "l m n"),
(6, "spark hadoop spark"),
(7, "apache hadoop")
], ["id", "text"])
# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
rid, text, prob, prediction = row
print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))
The most straightforward approach to evaluate a supervised model is to split the original dataset into two subsets.
Training subset: this set is used to train a model. Normally, supervised Machine Learning algorithms try to minimize an error value, so algorithms use features and labels from the training dataset to learn a model that minimizes this error.
However, if we expose the algorithm to too much learning we may see that the algorithm begins to memorize occurrences in the training dataset. This is the so called overfitting problem, this problem derives to poor generalization. At the end of the day we have a model that performs very well over the training data, but does a bad prediction into new occurrences.
To evaluate a Regression model we can use the following metrics:
In [ ]:
auto_df = spark.read.csv(path = '../data/auto-mpg.csv',
header = True,
inferSchema = True)
pred_vars = ['cylinders', 'displacement', 'weight', 'acceleration', 'year', 'origin']
vectorAssembler = VectorAssembler(
inputCols = pred_vars,
outputCol = 'features')
vec_auto_df = vectorAssembler.transform(auto_df)
vec_auto_df = vec_auto_df.withColumn("label", auto_df["mpg"])
vec_auto_df = vec_auto_df.select(['features', 'label'])
train_auto_df, test_auto_df = vec_auto_df.randomSplit([0.9, 0.1], seed=12345)
In [ ]:
lr = LinearRegression(maxIter=10,
regParam=0.3,
elasticNetParam=0.8)
# Fit the model
lrModel = lr.fit(train_auto_df)
In [ ]:
predicted_auto_df = lrModel.transform(test_auto_df)
In [ ]:
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
labelCol="label", metricName="mae")
results = lr_evaluator.evaluate(predicted_auto_df)
print("R Squared (R2) on test data = %g" % results)
We will focus on the classification of a binary model. This means that the classification model classifies between two exclusive variables: "a" and "b" for example.
A binary classifier can be seen as a classifier telling if the occurrence belongs to one of the classes, say "a". So, when the classifier outputs a true value, it means that the occurrence belongs to class a. If the classifier outputs a false value, it means that it does not belong to class "a", hence it belong to class "b". So, false outputs mean that the occurrence belongs to class "b".
source: https://en.wikipedia.org/wiki/Precision_and_recall
When we classify a new occurrence, we can have the following 4 cases:
source: https://en.wikipedia.org/wiki/Precision_and_recall
Taking these definitions into account we can define the following metrics:
In [ ]:
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
iris_df = spark.read.csv(path = '../data/iris.data',
header = False,
inferSchema = True)
iris_df.printSchema()
setosa_udf = udf(lambda x: "a" if x == "Iris-setosa" else "b", StringType())
iris_df = iris_df.withColumn("_c5", setosa_udf(iris_df["_c4"]))
pred_vars = ['_c0', '_c1', '_c2', '_c3']
vectorAssembler = VectorAssembler(
inputCols = pred_vars,
outputCol = 'features')
vec_iris_df = vectorAssembler.transform(iris_df)
indexer = StringIndexer(inputCol="_c5", outputCol="categoryIndex")
vec_iris_df = indexer.fit(vec_iris_df).transform(vec_iris_df)
vec_iris_df.sample(False, .05).show()
vec_iris_df = vec_iris_df.withColumn("label", vec_iris_df["categoryIndex"])
train_iris_df, test_iris_df = vec_iris_df.randomSplit([0.9, 0.1], seed=12345)
In [ ]:
lr = LogisticRegression(maxIter=10, regParam=0.001)
# Fit the model
lrModel = lr.fit(train_iris_df)
#predict
predictions = lrModel.transform(test_iris_df)
In [ ]:
from pyspark.mllib.evaluation import MulticlassMetrics
predictionAndLabels = predictions.rdd.map(lambda x: (x.prediction, x.label))
metrics = MulticlassMetrics(predictionAndLabels)
print("accuracy: {}".format(metrics.accuracy))
print("recall: {}".format(metrics.recall()))
print("precision: {}".format(metrics.precision()))
print("f1 measure: {}".format(metrics.fMeasure()))
When dealing with productionalization, we can divide the way our model will be behaving into 4 flavours:
The main tow things to take into account are:
How prediction will be running: ranging from more static to more dynamic, we can differentiate between running a model over a static set or letting our users to query the model as they need to.
How learning will be running: ranging from more static to more dynamic, we can differentiate between running the learning process once or re-learning as soon as we acquire new labeled samples.