In [69]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.pipeline import Pipeline, PipelineModel
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
In [4]:
df = (spark.read
.option("inferSchema", True)
.option("header", True)
.csv("/data/creditcard-fraud.csv"))
In [6]:
df.limit(10).toPandas()
Out[6]:
In [11]:
feature_columns = [col for col in df.columns if col.startswith("V")]
print(feature_columns)
In [28]:
vectorizer = VectorAssembler(inputCols = feature_columns, outputCol="features")
vectorizer.transform(df).select("features", "Class").limit(5).toPandas()
Out[28]:
In [30]:
est = RandomForestClassifier()
est.setMaxDepth(5)
est.setLabelCol("Class")
Out[30]:
In [31]:
print(est.explainParams())
In [33]:
df_train, df_test = df.randomSplit(weights=[0.7, 0.3], seed = 1)
In [34]:
pipeline = Pipeline()
pipeline.setStages([vectorizer, est])
model = pipeline.fit(df_train)
In [45]:
df_test_pred = model.transform(df_test)
In [46]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
In [47]:
evaluator = BinaryClassificationEvaluator()
evaluator.setLabelCol("Class")
Out[47]:
In [48]:
evaluator.evaluate(model.transform(df_test))
Out[48]:
In [53]:
from pyspark.sql.functions import *
In [59]:
test_accuracy = (df_test_pred
.select("Class", "prediction")
.withColumn("isEqual", expr("Class == prediction"))
.select(avg(expr("cast(isEqual as float)")))
.first())
In [60]:
test_accuracy
Out[60]:
In [67]:
treeEstimator = DecisionTreeClassifier()
treeEstimator.setImpurity("entropy")
treeEstimator.setLabelCol("Class")
pipeline = Pipeline()
pipeline.setStages([vectorizer, treeEstimator])
model = pipeline.fit(df_train)
evaluator.evaluate(model.transform(df_test))
Out[67]:
In [75]:
accuracy_evaluator = MulticlassClassificationEvaluator()
accuracy_evaluator.setLabelCol("Class")
accuracy_evaluator.setMetricName("accuracy")
accuracy_evaluator.evaluate(model.transform(df_test))
Out[75]:
In [78]:
f1_evaluator = MulticlassClassificationEvaluator()
f1_evaluator.setLabelCol("Class")
f1_evaluator.setMetricName("f1")
f1_evaluator.evaluate(model.transform(df_test))
Out[78]:
In [ ]: