We need to load data from a file in to a Spark DataFrame. Each row is an observed customer, and each column contains attributes of that customer.
Data from UCI data set repo, hosted by SGI
Fields:
state: discrete.
account length: numeric.
area code: numeric.
phone number: discrete.
international plan: discrete.
voice mail plan: discrete.
number vmail messages: numeric.
total day minutes: numeric.
total day calls: numeric.
total day charge: numeric.
total eve minutes: numeric.
total eve calls: numeric.
total eve charge: numeric.
total night minutes: numeric.
total night calls: numeric.
total night charge: numeric.
total intl minutes: numeric.
total intl calls: numeric.
total intl charge: numeric.
number customer service calls: numeric.
churned: discrete.
'Numeric' and 'discrete' do not adequately describe the fundamental differencecs in the attributes.
Area codes are considered numeric, but they a better thought of as a categorical variable. This is because attributes that are really numeric features have a reasonable concept of distance between points. Area codes do not fall into this cateogory. They can have a small distance |area_code_1 - area_code_2| but that distance doesn't correspond to a similarity in area codes.
In [1]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc)
schema = StructType([ \
StructField("state", StringType(), True), \
StructField("account_length", DoubleType(), True), \
StructField("area_code", StringType(), True), \
StructField("phone_number", StringType(), True), \
StructField("intl_plan", StringType(), True), \
StructField("voice_mail_plan", StringType(), True), \
StructField("number_vmail_messages", DoubleType(), True), \
StructField("total_day_minutes", DoubleType(), True), \
StructField("total_day_calls", DoubleType(), True), \
StructField("total_day_charge", DoubleType(), True), \
StructField("total_eve_minutes", DoubleType(), True), \
StructField("total_eve_calls", DoubleType(), True), \
StructField("total_eve_charge", DoubleType(), True), \
StructField("total_night_minutes", DoubleType(), True), \
StructField("total_night_calls", DoubleType(), True), \
StructField("total_night_charge", DoubleType(), True), \
StructField("total_intl_minutes", DoubleType(), True), \
StructField("total_intl_calls", DoubleType(), True), \
StructField("total_intl_charge", DoubleType(), True), \
StructField("number_customer_service_calls", DoubleType(), True), \
StructField("churned", StringType(), True)])
churn_data = sqlContext.read \
.format('com.databricks.spark.csv') \
.load('churn.all', schema = schema)
Dataframes essentially allow you to express sql-like statements. We can filter, count, and so on. DataFrame Operations documentation.
In [2]:
count = churn_data.count()
voice_mail_plans = churn_data.filter(churn_data.voice_mail_plan == " yes").count()
"%d, %d" % (count, voice_mail_plans)
Out[2]:
In [3]:
# Your code here
In [4]:
sample_data = churn_data.sample(False, 0.5, 83).toPandas()
sample_data.head()
Out[4]:
The type of visualization we do depends on the data type, so lets define what columns have different properties first:
In [5]:
numeric_cols = ["account_length", "number_vmail_messages", "total_day_minutes",
"total_day_calls", "total_day_charge", "total_eve_minutes",
"total_eve_calls", "total_eve_charge", "total_night_minutes",
"total_night_calls", "total_intl_minutes", "total_intl_calls",
"total_intl_charge"]
categorical_cols = ["state", "international_plan", "voice_mail_plan", "area_code"]
We want to examine the distribution of our features, so start with them one at a time.
Seaborn has a standard function called dist() that allows us to easily examine the distribution of a column of a pandas dataframe or a numpy array.
In [6]:
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sb
sb.distplot(sample_data['number_customer_service_calls'], kde=False)
Out[6]:
We can examine feature differences in the distribution of our features when we condition (split) our data in whether they churned or not.
In [7]:
sb.boxplot(x="churned", y="number_customer_service_calls", data=sample_data)
Out[7]:
Looking at joint distributions of data can also tell us a lot, particularly about redundant features. Seaborn's PairPlot let's us look at joint distributions for many variables at once.
In [8]:
example_numeric_data = sample_data[["total_intl_minutes", "total_intl_calls",
"total_intl_charge", "churned"]]
sb.pairplot(example_numeric_data, hue="churned")
Out[8]:
Clearly, there are some strong linear relationships between some variables, let's get a general impression of the correlations between variables by using Seaborn's heatmap functionality.
In [9]:
corr = sample_data[["account_length", "number_vmail_messages", "total_day_minutes",
"total_day_calls", "total_day_charge", "total_eve_minutes",
"total_eve_calls", "total_eve_charge", "total_night_minutes",
"total_night_calls", "total_intl_minutes", "total_intl_calls",
"total_intl_charge"]].corr()
sb.heatmap(corr)
Out[9]:
Let's generate a pair plot for all numerical variables that we have.
In [10]:
reduced_numeric_cols = ["account_length", "number_vmail_messages", "total_day_calls",
"total_day_charge", "total_eve_calls", "total_eve_charge",
"total_night_calls", "total_intl_calls", "total_intl_charge"]
sb.pairplot(sample_data[reduced_numeric_cols + ['churned']], hue="churned", palette='Paired')
Out[10]:
In [11]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
label_indexer = StringIndexer(inputCol = 'churned', outputCol = 'label')
plan_indexer = StringIndexer(inputCol = 'intl_plan', outputCol = 'intl_plan_indexed')
assembler = VectorAssembler(
inputCols = ['intl_plan_indexed'] + reduced_numeric_cols,
outputCol = 'features')
In [12]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
classifier = DecisionTreeClassifier(labelCol = 'label', featuresCol = 'features')
pipeline = Pipeline(stages=[plan_indexer, label_indexer, assembler, classifier])
(train, test) = churn_data.randomSplit([0.7, 0.3])
model = pipeline.fit(train)
Measure the area under the ROC curve, abreviated to AUROC.
In [13]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions = model.transform(test)
evaluator = BinaryClassificationEvaluator()
auroc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
aupr = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})
"The AUROC is %s and the AUPR is %s." % (auroc, aupr)
Out[13]:
Fit a random forest classifier to the data. Try experimenting with different values of the maxDepth, numTrees, and entropy parameters to see which gives the best classification performance. Do the settings that give the best classification performance on the training set also give the best classification performance on the test set?
Have a look at the documentation.
In [14]:
from pyspark.ml.classification import RandomForestClassifier
# Your code here
In [ ]: