First, we load the data.
In [ ]:
import pyspark.sql.types as typ
labels = [
('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),
('BIRTH_PLACE', typ.StringType()),
('MOTHER_AGE_YEARS', typ.IntegerType()),
('FATHER_COMBINED_AGE', typ.IntegerType()),
('CIG_BEFORE', typ.IntegerType()),
('CIG_1_TRI', typ.IntegerType()),
('CIG_2_TRI', typ.IntegerType()),
('CIG_3_TRI', typ.IntegerType()),
('MOTHER_HEIGHT_IN', typ.IntegerType()),
('MOTHER_PRE_WEIGHT', typ.IntegerType()),
('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
('DIABETES_PRE', typ.IntegerType()),
('DIABETES_GEST', typ.IntegerType()),
('HYP_TENS_PRE', typ.IntegerType()),
('HYP_TENS_GEST', typ.IntegerType()),
('PREV_BIRTH_PRETERM', typ.IntegerType())
]
schema = typ.StructType([
typ.StructField(e[0], e[1], False) for e in labels
])
births = spark.read.csv('births_transformed.csv.gz',
header=True,
schema=schema)
In [ ]:
import pyspark.ml.feature as ft
births = births \
.withColumn( 'BIRTH_PLACE_INT',
births['BIRTH_PLACE'] \
.cast(typ.IntegerType()))
Having done this, we can now create our first Transformer.
In [ ]:
encoder = ft.OneHotEncoder(
inputCol='BIRTH_PLACE_INT',
outputCol='BIRTH_PLACE_VEC')
Let's now create a single column with all the features collated together.
In [ ]:
featuresCreator = ft.VectorAssembler(
inputCols=[
col[0]
for col
in labels[2:]] + \
[encoder.getOutputCol()],
outputCol='features'
)
In this example we will (once again) us the Logistic Regression model.
In [ ]:
import pyspark.ml.classification as cl
Once loaded, let's create the model.
In [ ]:
logistic = cl.LogisticRegression(
maxIter=10,
regParam=0.01,
labelCol='INFANT_ALIVE_AT_REPORT')
All that is left now is to creat a Pipeline and fit the model. First, let's load the Pipeline from the package.
In [ ]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[
encoder,
featuresCreator,
logistic
])
Conventiently, DataFrame API has the .randomSplit(...) method.
In [ ]:
births_train, births_test = births \
.randomSplit([0.7, 0.3], seed=666)
Now run our pipeline and estimate our model.
In [ ]:
model = pipeline.fit(births_train)
test_model = model.transform(births_test)
Here's what the test_model looks like.
In [ ]:
test_model.take(1)
Obviously, we would like to now test how well our model did.
In [ ]:
import pyspark.ml.evaluation as ev
evaluator = ev.BinaryClassificationEvaluator(
rawPredictionCol='probability',
labelCol='INFANT_ALIVE_AT_REPORT')
print(evaluator.evaluate(test_model,
{evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderPR'}))
PySpark allows you to save the Pipeline definition for later use.
In [ ]:
pipelinePath = './infant_oneHotEncoder_Logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath)
So, you can load it up later and use straight away to .fit(...) and predict.
In [ ]:
loadedPipeline = Pipeline.load(pipelinePath)
loadedPipeline \
.fit(births_train)\
.transform(births_test)\
.take(1)
You can also save the whole model
In [ ]:
from pyspark.ml import PipelineModel
modelPath = './infant_oneHotEncoder_Logistic_PipelineModel'
model.write().overwrite().save(modelPath)
loadedPipelineModel = PipelineModel.load(modelPath)
test_loadedModel = loadedPipelineModel.transform(births_test)
Load the .tuning part of the package.
In [ ]:
import pyspark.ml.tuning as tune
Next let's specify our model and the list of parameters we want to loop through.
In [ ]:
logistic = cl.LogisticRegression(
labelCol='INFANT_ALIVE_AT_REPORT')
grid = tune.ParamGridBuilder() \
.addGrid(logistic.maxIter,
[2, 10, 50]) \
.addGrid(logistic.regParam,
[0.01, 0.05, 0.3]) \
.build()
Next, we need some way of comparing the models.
In [ ]:
evaluator = ev.BinaryClassificationEvaluator(
rawPredictionCol='probability',
labelCol='INFANT_ALIVE_AT_REPORT')
Create the logic that will do the validation work for us.
In [ ]:
cv = tune.CrossValidator(
estimator=logistic,
estimatorParamMaps=grid,
evaluator=evaluator
)
Create a purely transforming Pipeline.
In [ ]:
pipeline = Pipeline(stages=[encoder,featuresCreator])
data_transformer = pipeline.fit(births_train)
Having done this, we are ready to find the optimal combination of parameters for our model.
In [ ]:
cvModel = cv.fit(data_transformer.transform(births_train))
The cvModel will return the best model estimated. We can now use it to see if it performed better than our previous model.
In [ ]:
data_train = data_transformer \
.transform(births_test)
results = cvModel.transform(data_train)
print(evaluator.evaluate(results,
{evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results,
{evaluator.metricName: 'areaUnderPR'}))
What parameters has the best model? The answer is a little bit convoluted but here's how you can extract it.
In [ ]:
results = [
(
[
{key.name: paramValue}
for key, paramValue
in zip(
params.keys(),
params.values())
], metric
)
for params, metric
in zip(
cvModel.getEstimatorParamMaps(),
cvModel.avgMetrics
)
]
sorted(results,
key=lambda el: el[1],
reverse=True)[0]
Use the ChiSqSelector to select only top 5 features, thus limiting the complexity of our model.
In [ ]:
selector = ft.ChiSqSelector(
numTopFeatures=5,
featuresCol=featuresCreator.getOutputCol(),
outputCol='selectedFeatures',
labelCol='INFANT_ALIVE_AT_REPORT'
)
logistic = cl.LogisticRegression(
labelCol='INFANT_ALIVE_AT_REPORT',
featuresCol='selectedFeatures'
)
pipeline = Pipeline(stages=[encoder,featuresCreator,selector])
data_transformer = pipeline.fit(births_train)
The TrainValidationSplit object gets created in the same fashion as the CrossValidator model.
In [ ]:
tvs = tune.TrainValidationSplit(
estimator=logistic,
estimatorParamMaps=grid,
evaluator=evaluator
)
As before, we fit our data to the model, and calculate the results.
In [ ]:
tvsModel = tvs.fit(
data_transformer \
.transform(births_train)
)
data_train = data_transformer \
.transform(births_test)
results = tvsModel.transform(data_train)
print(evaluator.evaluate(results,
{evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results,
{evaluator.metricName: 'areaUnderPR'}))
Simple dataset.
In [ ]:
text_data = spark.createDataFrame([
['''Machine learning can be applied to a wide variety
of data types, such as vectors, text, images, and
structured data. This API adopts the DataFrame from
Spark SQL in order to support a variety of data types.'''],
['''DataFrame supports many basic and structured types;
see the Spark SQL datatype reference for a list of
supported types. In addition to the types listed in
the Spark SQL guide, DataFrame can use ML Vector types.'''],
['''A DataFrame can be created either implicitly or
explicitly from a regular RDD. See the code examples
below and the Spark SQL programming guide for examples.'''],
['''Columns in a DataFrame are named. The code examples
below use names such as "text," "features," and "label."''']
], ['input'])
First, we need to tokenize this text.
In [ ]:
tokenizer = ft.RegexTokenizer(
inputCol='input',
outputCol='input_arr',
pattern='\s+|[,.\"]')
The output of the tokenizer looks similar to this.
In [ ]:
tok = tokenizer \
.transform(text_data) \
.select('input_arr')
tok.take(1)
Use the StopWordsRemover(...).
In [ ]:
stopwords = ft.StopWordsRemover(
inputCol=tokenizer.getOutputCol(),
outputCol='input_stop')
The output of the method looks as follows
In [ ]:
stopwords.transform(tok).select('input_stop').take(1)
Build NGram model and the Pipeline.
In [ ]:
ngram = ft.NGram(n=2,
inputCol=stopwords.getOutputCol(),
outputCol="nGrams")
pipeline = Pipeline(stages=[tokenizer, stopwords, ngram])
Now that we have the pipeline we follow in the very similar fashion as before.
In [ ]:
data_ngram = pipeline \
.fit(text_data) \
.transform(text_data)
data_ngram.select('nGrams').take(1)
That's it. We got our n-grams and we can then use them in further NLP processing.
It is sometimes useful to band the values into discrete buckets.
In [ ]:
import numpy as np
x = np.arange(0, 100)
x = x / 100.0 * np.pi * 4
y = x * np.sin(x / 1.764) + 20.1234
schema = typ.StructType([
typ.StructField('continuous_var',
typ.DoubleType(),
False
)
])
data = spark.createDataFrame([[float(e), ] for e in y], schema=schema)
Use the QuantileDiscretizer model to split our continuous variable into 5 buckets (see the numBuckets parameter).
In [ ]:
discretizer = ft.QuantileDiscretizer(
numBuckets=5,
inputCol='continuous_var',
outputCol='discretized')
Let's see what we got.
In [ ]:
data_discretized = discretizer.fit(data).transform(data)
data_discretized \
.groupby('discretized')\
.mean('continuous_var')\
.sort('discretized')\
.collect()
In [ ]:
vectorizer = ft.VectorAssembler(
inputCols=['continuous_var'],
outputCol= 'continuous_vec')
Build a normalizer and a pipeline.
In [ ]:
normalizer = ft.StandardScaler(
inputCol=vectorizer.getOutputCol(),
outputCol='normalized',
withMean=True,
withStd=True
)
pipeline = Pipeline(stages=[vectorizer, normalizer])
data_standardized = pipeline.fit(data).transform(data)
We will now use the RandomForestClassfier to model the chances of survival for an infant.
First, we need to cast the label feature to DoubleType.
In [ ]:
import pyspark.sql.functions as func
births = births.withColumn(
'INFANT_ALIVE_AT_REPORT',
func.col('INFANT_ALIVE_AT_REPORT').cast(typ.DoubleType())
)
births_train, births_test = births \
.randomSplit([0.7, 0.3], seed=666)
We are ready to build our model.
In [ ]:
classifier = cl.RandomForestClassifier(
numTrees=5,
maxDepth=5,
labelCol='INFANT_ALIVE_AT_REPORT')
pipeline = Pipeline(
stages=[
encoder,
featuresCreator,
classifier])
model = pipeline.fit(births_train)
test = model.transform(births_test)
Let's now see how the RandomForestClassifier model performs compared to the LogisticRegression.
In [ ]:
evaluator = ev.BinaryClassificationEvaluator(
labelCol='INFANT_ALIVE_AT_REPORT')
print(evaluator.evaluate(test,
{evaluator.metricName: "areaUnderROC"}))
print(evaluator.evaluate(test,
{evaluator.metricName: "areaUnderPR"}))
Let's test how well would one tree do, then.
In [ ]:
classifier = cl.DecisionTreeClassifier(
maxDepth=5,
labelCol='INFANT_ALIVE_AT_REPORT')
pipeline = Pipeline(stages=[
encoder,
featuresCreator,
classifier]
)
model = pipeline.fit(births_train)
test = model.transform(births_test)
evaluator = ev.BinaryClassificationEvaluator(
labelCol='INFANT_ALIVE_AT_REPORT')
print(evaluator.evaluate(test,
{evaluator.metricName: "areaUnderROC"}))
print(evaluator.evaluate(test,
{evaluator.metricName: "areaUnderPR"}))
In this example we will use k-means model to find similarities in the births data.
In [ ]:
import pyspark.ml.clustering as clus
kmeans = clus.KMeans(k = 5,
featuresCol='features')
pipeline = Pipeline(stages=[
encoder,
featuresCreator,
kmeans]
)
model = pipeline.fit(births_train)
Having estimated the model, let's see if we can find some differences between clusters.
In [ ]:
test = model.transform(births_test)
test \
.groupBy('prediction') \
.agg({
'*': 'count',
'MOTHER_HEIGHT_IN': 'avg'
}).collect()
In the field of NLP, problems such as topic extract rely on clustering to detect documents with similar topics. First, let's create our dataset.
In [ ]:
text_data = spark.createDataFrame([
['''To make a computer do anything, you have to write a
computer program. To write a computer program, you have
to tell the computer, step by step, exactly what you want
it to do. The computer then "executes" the program,
following each step mechanically, to accomplish the end
goal. When you are telling the computer what to do, you
also get to choose how it's going to do it. That's where
computer algorithms come in. The algorithm is the basic
technique used to get the job done. Let's follow an
example to help get an understanding of the algorithm
concept.'''],
['''Laptop computers use batteries to run while not
connected to mains. When we overcharge or overheat
lithium ion batteries, the materials inside start to
break down and produce bubbles of oxygen, carbon dioxide,
and other gases. Pressure builds up, and the hot battery
swells from a rectangle into a pillow shape. Sometimes
the phone involved will operate afterwards. Other times
it will die. And occasionally—kapow! To see what's
happening inside the battery when it swells, the CLS team
used an x-ray technology called computed tomography.'''],
['''This technology describes a technique where touch
sensors can be placed around any side of a device
allowing for new input sources. The patent also notes
that physical buttons (such as the volume controls) could
be replaced by these embedded touch sensors. In essence
Apple could drop the current buttons and move towards
touch-enabled areas on the device for the existing UI. It
could also open up areas for new UI paradigms, such as
using the back of the smartphone for quick scrolling or
page turning.'''],
['''The National Park Service is a proud protector of
America’s lands. Preserving our land not only safeguards
the natural environment, but it also protects the
stories, cultures, and histories of our ancestors. As we
face the increasingly dire consequences of climate
change, it is imperative that we continue to expand
America’s protected lands under the oversight of the
National Park Service. Doing so combats climate change
and allows all American’s to visit, explore, and learn
from these treasured places for generations to come. It
is critical that President Obama acts swiftly to preserve
land that is at risk of external threats before the end
of his term as it has become blatantly clear that the
next administration will not hold the same value for our
environment over the next four years.'''],
['''The National Park Foundation, the official charitable
partner of the National Park Service, enriches America’s
national parks and programs through the support of
private citizens, park lovers, stewards of nature,
history enthusiasts, and wilderness adventurers.
Chartered by Congress in 1967, the Foundation grew out of
a legacy of park protection that began over a century
ago, when ordinary citizens took action to establish and
protect our national parks. Today, the National Park
Foundation carries on the tradition of early park
advocates, big thinkers, doers and dreamers—from John
Muir and Ansel Adams to President Theodore Roosevelt.'''],
['''Australia has over 500 national parks. Over 28
million hectares of land is designated as national
parkland, accounting for almost four per cent of
Australia's land areas. In addition, a further six per
cent of Australia is protected and includes state
forests, nature parks and conservation reserves.National
parks are usually large areas of land that are protected
because they have unspoilt landscapes and a diverse
number of native plants and animals. This means that
commercial activities such as farming are prohibited and
human activity is strictly monitored.''']
], ['documents'])
First, we will once again use the RegexTokenizer and the StopWordsRemover models.
In [ ]:
tokenizer = ft.RegexTokenizer(
inputCol='documents',
outputCol='input_arr',
pattern='\s+|[,.\"]')
stopwords = ft.StopWordsRemover(
inputCol=tokenizer.getOutputCol(),
outputCol='input_stop')
Next in our pipeline is the CountVectorizer.
In [ ]:
stringIndexer = ft.CountVectorizer(
inputCol=stopwords.getOutputCol(),
outputCol="input_indexed")
tokenized = stopwords \
.transform(
tokenizer\
.transform(text_data)
)
stringIndexer \
.fit(tokenized)\
.transform(tokenized)\
.select('input_indexed')\
.take(2)
We will use the LDA model - the Latent Dirichlet Allocation model - to extract the topics.
In [ ]:
clustering = clus.LDA(k=2, optimizer='online', featuresCol=stringIndexer.getOutputCol())
Put these puzzles together.
In [ ]:
pipeline = Pipeline(stages=[
tokenizer,
stopwords,
stringIndexer,
clustering]
)
Let's see if we have properly uncovered the topics.
In [ ]:
topics = pipeline \
.fit(text_data) \
.transform(text_data)
topics.select('topicDistribution').collect()
In this section we will try to predict the MOTHER_WEIGHT_GAIN.
In [ ]:
features = ['MOTHER_AGE_YEARS','MOTHER_HEIGHT_IN',
'MOTHER_PRE_WEIGHT','DIABETES_PRE',
'DIABETES_GEST','HYP_TENS_PRE',
'HYP_TENS_GEST', 'PREV_BIRTH_PRETERM',
'CIG_BEFORE','CIG_1_TRI', 'CIG_2_TRI',
'CIG_3_TRI'
]
First, we will collate all the features together and use the ChiSqSelector to select only the top 6 most important features.
In [ ]:
featuresCreator = ft.VectorAssembler(
inputCols=[col for col in features[1:]],
outputCol='features'
)
selector = ft.ChiSqSelector(
numTopFeatures=6,
outputCol="selectedFeatures",
labelCol='MOTHER_WEIGHT_GAIN'
)
In order to predict the weight gain we will use the gradient boosted trees regressor.
In [ ]:
import pyspark.ml.regression as reg
regressor = reg.GBTRegressor(
maxIter=15,
maxDepth=3,
labelCol='MOTHER_WEIGHT_GAIN')
Finally, again, we put it all together into a Pipeline.
In [ ]:
pipeline = Pipeline(stages=[
featuresCreator,
selector,
regressor])
weightGain = pipeline.fit(births_train)
Having created the weightGain model, let's see if it performs well on our testing data.
In [ ]:
evaluator = ev.RegressionEvaluator(
predictionCol="prediction",
labelCol='MOTHER_WEIGHT_GAIN')
print(evaluator.evaluate(
weightGain.transform(births_test),
{evaluator.metricName: 'r2'}))