In [2]:
%matplotlib inline

# Enable Retina mode for higher-res on Macbook's with Retina Display
%config InlineBackend.figure_format = 'retina'


/usr/local/lib/python2.7/dist-packages/matplotlib/font_manager.py:273: UserWarning: Matplotlib is building the font cache using fc-list. This may take a moment.
  warnings.warn('Matplotlib is building the font cache using fc-list. This may take a moment.')

In [3]:
import numpy as np
import pandas as pd
import numpy as np
import pandas as pd
from scipy import stats, integrate
import matplotlib.pyplot as plt

import seaborn as sns
from pyspark.sql import SQLContext
from pyspark.sql.types import *

In [4]:
itemsDF = sqlContext.read.format("json") \
            .load("file:/root/pipeline/myapps/html/advancedspark.com/json/software.json") \
            .select("id", "title", "category", "description")

In [5]:
import pyspark.sql.functions as func

categoriesDF = itemsDF.select("category") \
                 .groupBy("category") \
                 .count() \
                 .orderBy("count", ascending=False) \
                 .filter("count > 10") \
                 .toPandas()

categoriesDF


Out[5]:
category count
0 Library 15
1 Database 12
2 Data Processing 11

In [6]:
sns.set_style("whitegrid")
plot = sns.barplot(x="category", y="count", data=categoriesDF)



In [7]:
from pyspark.ml.feature import RegexTokenizer

tokenizer = RegexTokenizer(inputCol = "description", \
                           outputCol="words", \
                           gaps=False, \
                           pattern="\\p{L}+")
tokenizer


Out[7]:
RegexTokenizer_4e15a0a280cc709aaa34

In [8]:
from pyspark.ml.feature import StopWordsRemover

stopWordsFilter = StopWordsRemover(inputCol = "words", \
                                   outputCol = "filteredWords", \
                                   caseSensitive = False)
stopWordsFilter


Out[8]:
StopWordsRemover_476fb6af6a90061084e5

In [9]:
from pyspark.ml.feature import HashingTF

tf = HashingTF(inputCol = "filteredWords", \
               outputCol = "tfFeatures")
                                   
tf


Out[9]:
HashingTF_40c6913b313b72ae6d3a

In [10]:
from pyspark.ml.feature import IDF

idf = IDF(inputCol = "tfFeatures", \
          outputCol = "idfFeatures")
                                   
idf


Out[10]:
IDF_404b86ac3cf02f838132

In [11]:
from pyspark.ml.feature import StringIndexer

categoryIndexer = StringIndexer(inputCol = "category", \
                                outputCol = "indexedCategory") 

categoryIndexerModel = categoryIndexer.fit(itemsDF)

categoryIndexerModel


Out[11]:
StringIndexer_4df48918d505a6702c4b

In [12]:
from pyspark.ml.classification import DecisionTreeClassifier

classifier = DecisionTreeClassifier(featuresCol = "idfFeatures", \
                                    labelCol = "indexedCategory", \
                                    predictionCol = "prediction", \
                                    rawPredictionCol = "confidence", \
                                    probabilityCol = "probability")
classifier


Out[12]:
DecisionTreeClassifier_4f8ba06215455eca81fa

In [13]:
from pyspark.ml.feature import IndexToString

categoryReverseIndexer = IndexToString(inputCol = "prediction", \
                                       outputCol = "predictedCategory", \
                                       labels = categoryIndexerModel.labels)

categoryReverseIndexer


Out[13]:
IndexToString_409c9721b936b46e404f

In [14]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages = [tokenizer, stopWordsFilter, tf, idf, categoryIndexer, classifier, categoryReverseIndexer])

pipeline


Out[14]:
Pipeline_4adf91619f72219842e5

In [ ]:
# TODO:  Implement Cross Validation and Grid Search

In [19]:
pipelineModel = pipeline.fit(itemsDF)

In [20]:
predictionsDF = pipelineModel.transform(itemsDF)

predictionsDF.select("category", "prediction").toPandas()


Out[20]:
category prediction
0 Database 1.0
1 Distributed Cache 0.0
2 Cluster Provision 6.0
3 Container 0.0
4 Cloud Provider 0.0
5 Data Processing 0.0
6 Data Processing 0.0
7 Library 0.0
8 Library 0.0
9 File System 0.0
10 Cluster Resource Manager 0.0
11 Database 0.0
12 Data Processing 0.0
13 Data Processing 0.0
14 Distribution 0.0
15 UI 6.0
16 Data Processing 0.0
17 Message Broker 0.0
18 Library 0.0
19 Search Engine 9.0
20 Search Engine 9.0
21 Library 0.0
22 Data Processing 0.0
23 Cluster Resource Manager 0.0
24 File Format 3.0
25 File Format 3.0
26 Data Processing 0.0
27 Distributed Coordinator 0.0
28 Library 0.0
29 Data Processing 0.0
... ... ...
50 Distributed Cache 7.0
51 File Format 3.0
52 File Format 3.0
53 Database 1.0
54 Cloud Provider 0.0
55 BI 0.0
56 Workflow 0.0
57 Database 0.0
58 Database 1.0
59 Library 0.0
60 Library 0.0
61 Library 0.0
62 File Format 3.0
63 Library 0.0
64 Database 0.0
65 Library 0.0
66 Database 1.0
67 Library 0.0
68 Database 1.0
69 Data Processing 0.0
70 Library 0.0
71 Distributed Cache 7.0
72 Library 0.0
73 Database 1.0
74 File Format 0.0
75 File System 0.0
76 Data Processing 0.0
77 Database 1.0
78 Database 0.0
79 Database 0.0

80 rows × 2 columns

Spark 2.0+ Only!


In [21]:
# Save & load the Random Forest model
pipelineModelPersistPath = "/tmp/spark/2.0.0/dt"
pipelineModel.save(pipelineModelPersistPath)

In [18]:
from pyspark.ml import PipelineModel

samePipelineModel = PipelineModel.load(pipelineModelPersistPath)



AttributeErrorTraceback (most recent call last)
<ipython-input-22-2ce04733274a> in <module>()
      1 from pyspark.ml import PipelineModel
      2 
----> 3 samePipelineModel = PipelineModel.load(pipelineModelPersistPath)

AttributeError: type object 'PipelineModel' has no attribute 'load'

In [19]:
samePredictionsDF = samePipelineModel.transform(itemsDF)

samePredictionsDF.select("category", "prediction").toPandas()


Out[19]:
category prediction
0 Database 1.0
1 Distributed Cache 0.0
2 Cluster Provision 6.0
3 Container 0.0
4 Cloud Provider 0.0
5 Data Processing 0.0
6 Data Processing 0.0
7 Library 0.0
8 Library 0.0
9 File System 0.0
10 Cluster Resource Manager 0.0
11 Database 0.0
12 Data Processing 0.0
13 Data Processing 0.0
14 Distribution 0.0
15 UI 6.0
16 Data Processing 0.0
17 Message Broker 0.0
18 Library 0.0
19 Search Engine 9.0
20 Search Engine 9.0
21 Library 0.0
22 Data Processing 0.0
23 Cluster Resource Manager 0.0
24 File Format 3.0
25 File Format 3.0
26 Data Processing 0.0
27 Distributed Coordinator 0.0
28 Library 0.0
29 Data Processing 0.0
... ... ...
50 Distributed Cache 7.0
51 File Format 3.0
52 File Format 3.0
53 Database 1.0
54 Cloud Provider 0.0
55 BI 0.0
56 Workflow 0.0
57 Database 0.0
58 Database 1.0
59 Library 0.0
60 Library 0.0
61 Library 0.0
62 File Format 3.0
63 Library 0.0
64 Database 0.0
65 Library 0.0
66 Database 1.0
67 Library 0.0
68 Database 1.0
69 Data Processing 0.0
70 Library 0.0
71 Distributed Cache 7.0
72 Library 0.0
73 Database 1.0
74 File Format 0.0
75 File System 0.0
76 Data Processing 0.0
77 Database 1.0
78 Database 0.0
79 Database 0.0

80 rows × 2 columns


In [24]:
stringToIndexerPersistPath = "/tmp/spark/2.0.0/s2i"

categoryIndexerModel.save(stringToIndexerPersistPath)



AttributeErrorTraceback (most recent call last)
<ipython-input-24-9d740edcbf1b> in <module>()
----> 1 categoryIndexerModel.save("/tmp/spark/2.0.0/s2i")

AttributeError: 'StringIndexerModel' object has no attribute 'save'

In [26]:
index2StringPersistPath = "/tmp/spark/2.0.0/i2s"

categoryReverseIndexer.save(index2StringPersistPath)



AttributeErrorTraceback (most recent call last)
<ipython-input-26-3b1e2e61bd97> in <module>()
      1 index2StringPersistPath = "/tmp/spark/2.0.0/i2s"
      2 
----> 3 categoryReverseIndexer.save(index2StringPersistPath)

AttributeError: 'IndexToString' object has no attribute 'save'

In [ ]: