In [2]:
%matplotlib inline
# Enable Retina mode for higher-res on Macbook's with Retina Display
%config InlineBackend.figure_format = 'retina'
In [3]:
import numpy as np
import pandas as pd
import numpy as np
import pandas as pd
from scipy import stats, integrate
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SQLContext
from pyspark.sql.types import *
In [4]:
itemsDF = sqlContext.read.format("json") \
.load("file:/root/pipeline/myapps/html/advancedspark.com/json/software.json") \
.select("id", "title", "category", "description")
In [5]:
import pyspark.sql.functions as func
categoriesDF = itemsDF.select("category") \
.groupBy("category") \
.count() \
.orderBy("count", ascending=False) \
.filter("count > 10") \
.toPandas()
categoriesDF
Out[5]:
In [6]:
sns.set_style("whitegrid")
plot = sns.barplot(x="category", y="count", data=categoriesDF)
In [7]:
from pyspark.ml.feature import RegexTokenizer
tokenizer = RegexTokenizer(inputCol = "description", \
outputCol="words", \
gaps=False, \
pattern="\\p{L}+")
tokenizer
Out[7]:
In [8]:
from pyspark.ml.feature import StopWordsRemover
stopWordsFilter = StopWordsRemover(inputCol = "words", \
outputCol = "filteredWords", \
caseSensitive = False)
stopWordsFilter
Out[8]:
In [9]:
from pyspark.ml.feature import HashingTF
tf = HashingTF(inputCol = "filteredWords", \
outputCol = "tfFeatures")
tf
Out[9]:
In [10]:
from pyspark.ml.feature import IDF
idf = IDF(inputCol = "tfFeatures", \
outputCol = "idfFeatures")
idf
Out[10]:
In [11]:
from pyspark.ml.feature import StringIndexer
categoryIndexer = StringIndexer(inputCol = "category", \
outputCol = "indexedCategory")
categoryIndexerModel = categoryIndexer.fit(itemsDF)
categoryIndexerModel
Out[11]:
In [12]:
from pyspark.ml.classification import DecisionTreeClassifier
classifier = DecisionTreeClassifier(featuresCol = "idfFeatures", \
labelCol = "indexedCategory", \
predictionCol = "prediction", \
rawPredictionCol = "confidence", \
probabilityCol = "probability")
classifier
Out[12]:
In [13]:
from pyspark.ml.feature import IndexToString
categoryReverseIndexer = IndexToString(inputCol = "prediction", \
outputCol = "predictedCategory", \
labels = categoryIndexerModel.labels)
categoryReverseIndexer
Out[13]:
In [14]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = [tokenizer, stopWordsFilter, tf, idf, categoryIndexer, classifier, categoryReverseIndexer])
pipeline
Out[14]:
In [ ]:
# TODO: Implement Cross Validation and Grid Search
In [19]:
pipelineModel = pipeline.fit(itemsDF)
In [20]:
predictionsDF = pipelineModel.transform(itemsDF)
predictionsDF.select("category", "prediction").toPandas()
Out[20]:
In [21]:
# Save & load the Random Forest model
pipelineModelPersistPath = "/tmp/spark/2.0.0/dt"
pipelineModel.save(pipelineModelPersistPath)
In [18]:
from pyspark.ml import PipelineModel
samePipelineModel = PipelineModel.load(pipelineModelPersistPath)
In [19]:
samePredictionsDF = samePipelineModel.transform(itemsDF)
samePredictionsDF.select("category", "prediction").toPandas()
Out[19]:
In [24]:
stringToIndexerPersistPath = "/tmp/spark/2.0.0/s2i"
categoryIndexerModel.save(stringToIndexerPersistPath)
In [26]:
index2StringPersistPath = "/tmp/spark/2.0.0/i2s"
categoryReverseIndexer.save(index2StringPersistPath)
In [ ]: