In [69]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.pipeline import Pipeline, PipelineModel
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [4]:
df = (spark.read
      .option("inferSchema", True)
      .option("header", True)
      .csv("/data/creditcard-fraud.csv"))

In [6]:
df.limit(10).toPandas()


Out[6]:
Time V1 V2 V3 V4 V5 V6 V7 V8 V9 ... V21 V22 V23 V24 V25 V26 V27 V28 Amount Class
0 0 -1.359807 -0.072781 2.536347 1.378155 -0.338321 0.462388 0.239599 0.098698 0.363787 ... -0.018307 0.277838 -0.110474 0.066928 0.128539 -0.189115 0.133558 -0.021053 149.62 0
1 0 1.191857 0.266151 0.166480 0.448154 0.060018 -0.082361 -0.078803 0.085102 -0.255425 ... -0.225775 -0.638672 0.101288 -0.339846 0.167170 0.125895 -0.008983 0.014724 2.69 0
2 1 -1.358354 -1.340163 1.773209 0.379780 -0.503198 1.800499 0.791461 0.247676 -1.514654 ... 0.247998 0.771679 0.909412 -0.689281 -0.327642 -0.139097 -0.055353 -0.059752 378.66 0
3 1 -0.966272 -0.185226 1.792993 -0.863291 -0.010309 1.247203 0.237609 0.377436 -1.387024 ... -0.108300 0.005274 -0.190321 -1.175575 0.647376 -0.221929 0.062723 0.061458 123.50 0
4 2 -1.158233 0.877737 1.548718 0.403034 -0.407193 0.095921 0.592941 -0.270533 0.817739 ... -0.009431 0.798278 -0.137458 0.141267 -0.206010 0.502292 0.219422 0.215153 69.99 0
5 2 -0.425966 0.960523 1.141109 -0.168252 0.420987 -0.029728 0.476201 0.260314 -0.568671 ... -0.208254 -0.559825 -0.026398 -0.371427 -0.232794 0.105915 0.253844 0.081080 3.67 0
6 4 1.229658 0.141004 0.045371 1.202613 0.191881 0.272708 -0.005159 0.081213 0.464960 ... -0.167716 -0.270710 -0.154104 -0.780055 0.750137 -0.257237 0.034507 0.005168 4.99 0
7 7 -0.644269 1.417964 1.074380 -0.492199 0.948934 0.428118 1.120631 -3.807864 0.615375 ... 1.943465 -1.015455 0.057504 -0.649709 -0.415267 -0.051634 -1.206921 -1.085339 40.80 0
8 7 -0.894286 0.286157 -0.113192 -0.271526 2.669599 3.721818 0.370145 0.851084 -0.392048 ... -0.073425 -0.268092 -0.204233 1.011592 0.373205 -0.384157 0.011747 0.142404 93.20 0
9 9 -0.338262 1.119593 1.044367 -0.222187 0.499361 -0.246761 0.651583 0.069539 -0.736727 ... -0.246914 -0.633753 -0.120794 -0.385050 -0.069733 0.094199 0.246219 0.083076 3.68 0

10 rows × 31 columns


In [11]:
feature_columns = [col for col in df.columns if col.startswith("V")]
print(feature_columns)


['V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9', 'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28']

In [28]:
vectorizer = VectorAssembler(inputCols = feature_columns, outputCol="features")
vectorizer.transform(df).select("features", "Class").limit(5).toPandas()


Out[28]:
features Class
0 [-1.3598071336738, -0.0727811733098497, 2.5363... 0
1 [1.19185711131486, 0.26615071205963, 0.1664801... 0
2 [-1.35835406159823, -1.34016307473609, 1.77320... 0
3 [-0.966271711572087, -0.185226008082898, 1.792... 0
4 [-1.15823309349523, 0.877736754848451, 1.54871... 0

In [30]:
est = RandomForestClassifier()
est.setMaxDepth(5)
est.setLabelCol("Class")


Out[30]:
RandomForestClassifier_4c658cfbc38faf430271

In [31]:
print(est.explainParams())


cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
featureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: auto, all, onethird, sqrt, log2, (0.0-1.0], [1-n]. (default: auto)
featuresCol: features column name. (default: features)
impurity: Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini (default: gini)
labelCol: label column name. (default: label, current: Class)
maxBins: Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature. (default: 32)
maxDepth: Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. (default: 5, current: 5)
maxMemoryInMB: Maximum memory in MB allocated to histogram aggregation. If too small, then 1 node will be split per iteration, and its aggregates may exceed this size. (default: 256)
minInfoGain: Minimum information gain for a split to be considered at a tree node. (default: 0.0)
minInstancesPerNode: Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1. (default: 1)
numTrees: Number of trees to train (>= 1). (default: 20)
predictionCol: prediction column name. (default: prediction)
probabilityCol: Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities. (default: probability)
rawPredictionCol: raw prediction (a.k.a. confidence) column name. (default: rawPrediction)
seed: random seed. (default: -5387697053847413545)
subsamplingRate: Fraction of the training data used for learning each decision tree, in range (0, 1]. (default: 1.0)

In [33]:
df_train, df_test = df.randomSplit(weights=[0.7, 0.3], seed = 1)

In [34]:
pipeline = Pipeline()
pipeline.setStages([vectorizer, est])
model = pipeline.fit(df_train)

In [45]:
df_test_pred = model.transform(df_test)

In [46]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [47]:
evaluator = BinaryClassificationEvaluator()
evaluator.setLabelCol("Class")


Out[47]:
BinaryClassificationEvaluator_42c282003717c18c5038

In [48]:
evaluator.evaluate(model.transform(df_test))


Out[48]:
0.9691279828708574

In [53]:
from pyspark.sql.functions import *

In [59]:
test_accuracy = (df_test_pred
.select("Class", "prediction")
.withColumn("isEqual", expr("Class == prediction"))
.select(avg(expr("cast(isEqual as float)")))
.first())

In [60]:
test_accuracy


Out[60]:
Row(avg(CAST(isEqual AS FLOAT))=0.999285011017863)

In [67]:
treeEstimator = DecisionTreeClassifier()
treeEstimator.setImpurity("entropy")
treeEstimator.setLabelCol("Class")

pipeline = Pipeline()
pipeline.setStages([vectorizer, treeEstimator])
model = pipeline.fit(df_train)
evaluator.evaluate(model.transform(df_test))


Out[67]:
0.7787428245059321

In [75]:
accuracy_evaluator = MulticlassClassificationEvaluator()
accuracy_evaluator.setLabelCol("Class")
accuracy_evaluator.setMetricName("accuracy")
accuracy_evaluator.evaluate(model.transform(df_test))


Out[75]:
0.9992732898870083

In [78]:
f1_evaluator = MulticlassClassificationEvaluator()
f1_evaluator.setLabelCol("Class")
f1_evaluator.setMetricName("f1")
f1_evaluator.evaluate(model.transform(df_test))


Out[78]:
0.9992479231033246

In [ ]: