Generally, use of MLLIb for supervised and unsupervised learning follow some or all of the stages in the following template:
This is often assembled as a pipeline for convenience and reproducibility. This is very similar to what you would do with sklearn, except that MLLib allows you to handle massive datasets by distributing the analysis to multiple computers.
In [1]:
from pyspark import SparkContext
sc = SparkContext('local[*]')
In [2]:
from pyspark.sql import SQLContext
sqlc = SQLContext(sc)
In [3]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import PCA
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import GaussianMixture
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
We saw this machine learning problem previously with sklearn, where the task is to distinguish rocks from mines using 60 sonar numerical features. We will illustrate some of the mechanics of how to work with MLLib - this is not intended to be a serious attempt at modeling the data.
In [4]:
df = (sqlc.read.format('com.databricks.spark.csv')
.options(header='false', inferschema='true')
.load('data/sonar.all-data.txt'))
In [5]:
df.printSchema()
In [6]:
df = df.withColumnRenamed("C60","label")
Transform 60 features into MMlib vectors
In [7]:
assembler = VectorAssembler(
inputCols=['C%d' % i for i in range(60)],
outputCol="features")
output = assembler.transform(df)
Scale features to have zero mean and unit standard deviation
In [8]:
standardizer = StandardScaler(withMean=True, withStd=True,
inputCol='features',
outputCol='std_features')
model = standardizer.fit(output)
output = model.transform(output)
Convert label to numeric index
In [9]:
indexer = StringIndexer(inputCol="label", outputCol="label_idx")
indexed = indexer.fit(output).transform(output)
Extract only columns of interest
In [10]:
sonar = indexed.select(['std_features', 'label', 'label_idx'])
In [11]:
sonar.show(n=3)
We will first fit a Gaussian Mixture Model with 2 components to the first 2 principal components of the data as an example of unsupervised learning. The GaussianMixture model requires an RDD of vectors, not a DataFrame. Note that pyspark converts numpy arrays to Spark vectors.
In [12]:
pca = PCA(k=2, inputCol="std_features", outputCol="pca")
model = pca.fit(sonar)
transformed = model.transform(sonar)
In [13]:
features = transformed.select('pca').rdd.map(lambda x: np.array(x))
In [14]:
features.take(3)
Out[14]:
In [15]:
gmm = GaussianMixture.train(features, k=2)
In [16]:
predict = gmm.predict(features).collect()
In [17]:
labels = sonar.select('label_idx').rdd.map(lambda r: r[0]).collect()
The GMM is poor at clustering rocks and mines based on the first 2 PC of the sonographic data.
In [18]:
np.corrcoef(predict, labels)
Out[18]:
Plot discrepancy between predicted and labels
In [19]:
xs = np.array(features.collect()).squeeze()
fig, axes = plt.subplots(1, 2, figsize=(10, 4))
axes[0].scatter(xs[:, 0], xs[:,1], c=predict)
axes[0].set_title('Predicted')
axes[1].scatter(xs[:, 0], xs[:,1], c=labels)
axes[1].set_title('Labels')
pass
We will fit a logistic regression model to the data as an example of supervised learning.
In [20]:
sonar.show(n=3)
Convert to format expected by regression functions in mllib
In [21]:
data = sonar.map(lambda x: LabeledPoint(x[2], x[0]))
Split into test and train sets
In [22]:
train, test = data.randomSplit([0.7, 0.3])
Fit model to training data
In [23]:
model = LogisticRegressionWithLBFGS.train(train)
Evaluate on test data
In [24]:
y_yhat = test.map(lambda x: (x.label, model.predict(x.features)))
err = y_yhat.filter(lambda x: x[0] != x[1]).count() / float(test.count())
print("Error = " + str(err))
In [25]:
transformer = VectorAssembler(inputCols=['C%d' % i for i in range(60)],
outputCol="features")
standardizer = StandardScaler(withMean=True, withStd=True,
inputCol='features',
outputCol='std_features')
indexer = StringIndexer(inputCol="C60", outputCol="label_idx")
pca = PCA(k=5, inputCol="std_features", outputCol="pca")
lr = LogisticRegression(featuresCol='std_features', labelCol='label_idx')
pipeline = Pipeline(stages=[transformer, standardizer, indexer, pca, lr])
In [26]:
df = (sqlc.read.format('com.databricks.spark.csv')
.options(header='false', inferschema='true')
.load('data/sonar.all-data.txt'))
In [27]:
train, test = df.randomSplit([0.7, 0.3])
In [28]:
model = pipeline.fit(train)
In [29]:
import warnings
with warnings.catch_warnings():
warnings.simplefilter('ignore')
prediction = model.transform(test)
In [30]:
score = prediction.select(['label_idx', 'prediction'])
score.show(n=score.count())
In [31]:
acc = score.map(lambda x: x[0] == x[1]).sum() / score.count()
acc
Out[31]:
Install if necessary
! pip install spark_sklearn
In [34]:
from sklearn import svm, grid_search, datasets
from spark_sklearn import GridSearchCV
iris = datasets.load_iris()
parameters = {'kernel':('linear', 'rbf'), 'C':[1, 10]}
svr = svm.SVC()
clf = GridSearchCV(sc, svr, parameters)
clf.fit(iris.data, iris.target)
Out[34]:
In [ ]: