Credit Card Transactions Fraud Detection Example:

The notebook demonstrates how to develop a fraud detection application with the BigDL deep learning library on Apache Spark. We'll try to introduce some techniques that can be used for training a fraud detection model, but some advanced skills is not applicable since the dataset is highly simplified.

Dataset: Credit Card Fraud Detection https://www.kaggle.com/mlg-ulb/creditcardfraud/data

This dataset presents transactions that occurred in two days, where we got 492 frauds out of 284,807 transactions. The dataset is highly unbalanced, the positive class (frauds) account for 0.172% of all transactions.

It contains only numerical input variables which are the result of a PCA transformation. Unfortunately, due to confidentiality issues, we cannot find the original features and more background information about the data. Features V1, V2, ... V28 are the principal components obtained with PCA, the only features which have not been transformed with PCA are 'Time' and 'Amount'. Feature 'Time' contains the seconds elapsed between each transaction and the first transaction in the dataset. The feature 'Amount' is the transaction Amount, this feature can be used for example-dependant cost-senstive learning. Feature 'Class' is the response variable and it takes value 1 in case of fraud and 0 otherwise.

Software stack: Scala 2.11, Spark 2.0 or above, BigDL 0.4 or above

Contact: yuhao.yang@intel.com

Loading data from csv files and output the schema:


In [2]:
import com.intel.analytics.bigdl.nn._
import com.intel.analytics.bigdl.tensor.TensorNumericMath.TensorNumeric.NumericDouble
import com.intel.analytics.bigdl.utils.Engine
import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, MulticlassClassificationEvaluator}
import org.apache.spark.ml.feature.{FuncTransformer, StandardScaler, StratifiedSampler, VectorAssembler}
import org.apache.spark.ml.{DLClassifier, Pipeline}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DoubleType, StructField}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import com.intel.analytics.bigdl.utils.LoggerFilter
import com.intel.analytics.bigdl.dlframes.DLModel
LoggerFilter.redirectSparkInfoLogs()
Logger.getLogger("com.intel.analytics.bigdl.optim").setLevel(Level.INFO)

val conf = Engine.createSparkConf()
val spark = SparkSession.builder().master("local[4]").appName("Fraud Detection Example").config(conf).getOrCreate()
import spark.implicits._
Engine.init

val raw = spark.read.format("csv").option("header", "true").option("mode", "DROPMALFORMED").csv("/tmp/datasets/creditcard.csv")

// cast all the column to Double type.
val df = raw.select(((1 to 28).map(i => "V" + i) ++ Array("Time", "Amount", "Class")).map(s => col(s).cast("Double")): _*)

println("num of records: " + df.count())

// select a few columns to show.
df.select("V1", "V2", "Time", "Amount", "Class").show()
println(" Class statistics: 1 represents fraud and 0 represents normal")
df.groupBy("Class").count().show()


num of records: 284807                                                          
+------------------+-------------------+----+------+-----+
|                V1|                 V2|Time|Amount|Class|
+------------------+-------------------+----+------+-----+
|  -1.3598071336738|-0.0727811733098497| 0.0|149.62|  0.0|
|  1.19185711131486|   0.26615071205963| 0.0|  2.69|  0.0|
| -1.35835406159823|  -1.34016307473609| 1.0|378.66|  0.0|
|-0.966271711572087| -0.185226008082898| 1.0| 123.5|  0.0|
| -1.15823309349523|  0.877736754848451| 2.0| 69.99|  0.0|
|-0.425965884412454|  0.960523044882985| 2.0|  3.67|  0.0|
|  1.22965763450793|  0.141003507049326| 4.0|  4.99|  0.0|
|-0.644269442348146|   1.41796354547385| 7.0|  40.8|  0.0|
| -0.89428608220282|  0.286157196276544| 7.0|  93.2|  0.0|
| -0.33826175242575|   1.11959337641566| 9.0|  3.68|  0.0|
|  1.44904378114715|  -1.17633882535966|10.0|   7.8|  0.0|
|  0.38497821518095|  0.616109459176472|10.0|  9.99|  0.0|
|    1.249998742053|  -1.22163680921816|10.0| 121.5|  0.0|
|   1.0693735878819|  0.287722129331455|11.0|  27.5|  0.0|
|  -2.7918547659339| -0.327770756658658|12.0|  58.8|  0.0|
|-0.752417042956605|  0.345485415344747|12.0| 15.99|  0.0|
|  1.10321543528383|-0.0402962145973447|12.0| 12.99|  0.0|
|-0.436905071360625|  0.918966212909322|13.0|  0.89|  0.0|
| -5.40125766315825|  -5.45014783420644|14.0|  46.8|  0.0|
|   1.4929359769862|  -1.02934573189487|15.0|   5.0|  0.0|
+------------------+-------------------+----+------+-----+
only showing top 20 rows

 Class statistics: 1 represents fraud and 0 represents normal
[Stage 20:=============================>                            (1 + 1) / 2]+-----+------+
|Class| count|
+-----+------+
|  0.0|284315|
|  1.0|   492|
+-----+------+

Feature analysis:

Normally it would improve the model if we could derive more features from the raw transaction records. E.g.

days to last transaction,
distance with last transaction,
amount percentage over the last 1 month / 3months
...

Yet with the public dataset, we can hardly derive any extention features from the PCA result. So here we only introduce several general practices:

Usually there's a lot of categorical data in the raw dataset, E.g. post code, card type, merchandise id, seller id, etc. 1). For categorical feature with limited candidate values, like card type, channel id, just use OneHotEncoder. 2). For categorical feature with many candidate values, like merchandise id, post code or even phone number, suggest to use Weight of Evidence. 3). You can also use FeatureHasher from Spark MLlib which will be release with Spark 2.3.

For this dataset, essentially it's a classification problem with highly unbalanced data set.

Approach

  1. We will build a feature transform pipeline with Apache Spark and some of our transformers.
  2. We will run some inital statistical analysis and split the dataset for training and validation.
  3. We will build the model with BigDL.
  4. We will compare different strategy to handle the unbalance.

Details of each step is as follows:

step 1. Build an inital pipeline for feature transform.

For each training records, we intend to aggregate all the features into one Spark Vector, which will then be sent to BigDL model for the training. First we'd like to introduce one handy transformer that we developed to help user build custom Transformers for Spark ML Pipeline.

class FuncTransformer (
  override val uid: String,
  val func: UserDefinedFunction
) extends Transformer with HasInputCol with HasOutputCol with DefaultParamsWritable {

FuncTransformer takes an udf as the constructor parameter and use the udf to perform the actual transform. The transformer can be saved/loaded as other transformer and can be integrated into a pipeline normally. It can be used widely in many use cases like conditional conversion(if...else...), , type conversion, to/from Array, to/from Vector and many string ops. Some examples:

val labelConverter = new FuncTransformer(udf { i: Double => if (i >= 1) 1 else 0 })

val shifter = new FuncTransformer(udf { i: Double => i + 1 })

val toVector = new FuncTransformer(udf { i: Double => Vectors.dense(i) })

We will use VectorAssembler to compose the all the Vx columns and append the Amount column. Then use StandardScaler to normlize the training records. Since in BigDL, the criterion generally only accepts 1, 2, 3... as the Label, so we will replace all the 0 with 2 in the training data.


In [3]:
// convert the label from {0, 1} to {1, 2}
val labelConverter = new FuncTransformer(udf {d: Double => if (d==0) 2 else d }).setInputCol("Class").setOutputCol("Class")
val assembler = new VectorAssembler().setInputCols((1 to 28).map(i => "V" + i).toArray ++ Array("Amount")).setOutputCol("assembled")
val scaler = new StandardScaler().setInputCol("assembled").setOutputCol("features")
val pipeline = new Pipeline().setStages(Array(assembler, scaler, labelConverter))
val pipelineModel = pipeline.fit(df)
val data = pipelineModel.transform(df)

println("Generate feature from raw data:")
data.select("features", "Class").show()


[Stage 30:=============================>                            (1 + 1) / 2]Generate feature from raw data:
+--------------------+-----+
|            features|Class|
+--------------------+-----+
|[-0.6942411021638...|  2.0|
|[0.60849525943109...|  2.0|
|[-0.6934992452238...|  2.0|
|[-0.4933240320774...|  2.0|
|[-0.5913287255806...|  2.0|
|[-0.2174742415711...|  2.0|
|[0.62779408220828...|  2.0|
|[-0.3289277697329...|  2.0|
|[-0.4565722152677...|  2.0|
|[-0.1726974406947...|  2.0|
|[0.73980031932134...|  2.0|
|[0.19654824114227...|  2.0|
|[0.63817910856358...|  2.0|
|[0.54596205586179...|  2.0|
|[-1.4253641430361...|  2.0|
|[-0.3841418567777...|  2.0|
|[0.56323980125506...|  2.0|
|[-0.2230591756515...|  2.0|
|[-2.7575786155874...|  2.0|
|[0.76220920780347...|  2.0|
+--------------------+-----+
only showing top 20 rows

step 2. split the dataset into training and validation dataset.

Unlike some other training dataset, where the data does not have a time of occurance. For this case, we can know the sequence of the transactions from the Time column. Thus randomly splitting the data into training and validation does not make much sense, since in real world applications, we can only use the history transactions for training and use the latest transactions for validation. Thus we'll split the dataset according the time of occurance.


In [4]:
// get the time to split the data.
    val splitTime = data.stat.approxQuantile("Time", Array(0.7), 0.001).head

    val trainingData = data.filter(s"Time<$splitTime").cache()
    val validData = data.filter(s"Time>=$splitTime").cache()

    println("Split data into Training and Validation: ")
    println("training records count: " + trainingData.count())
    println("validation records count: " + validData.count())


[Stage 32:=============================>                            (1 + 1) / 2]Split data into Training and Validation: 
training records count: 199096                                                  
validation records count: 85711                                                 

step 3. Build the model with BigDL

From the research community and industry feedback, a simple neural network turns out be the perfect candidate for the fraud detection training. We will quickly build a multiple layer Perceptron with linear layers.

    val bigDLModel = Sequential()
      .add(Linear(29, 10))
      .add(Linear(10, 2))
      .add(LogSoftMax())
    val criterion = ClassNLLCriterion()

BigDL provides DLEstimator and DLClassifier for users with Apache Spark MLlib experience, which provides high level API for training a BigDL Model with the Apache Spark Estimator/Transfomer pattern, thus users can conveniently fit BigDL into a ML pipeline. The fitted model DLModel and DLClassiferModel contains the trained BigDL model and extends the Spark ML Model class. Alternatively users may also construct a DLModel with a pre-trained BigDL model to use it in Spark ML Pipeline for prediction.

DLClassifier is a specialized DLEstimator that simplifies the data format for classification tasks. It only supports label column of DoubleType, and the fitted DLClassifierModel will have the prediction column of DoubleType.

For this case we'll just use DLClassifier for the training. Note that users can set differet optimization mothod, batch size and epoch number.


In [5]:
val bigDLModel = Sequential().add(Linear(29, 10)).add(Linear(10, 2)).add(LogSoftMax())
val criterion = ClassNLLCriterion()
val dlClassifier = new DLClassifier(bigDLModel, criterion, Array(29)).setLabelCol("Class").setLearningRate(3e-2).setBatchSize(10000).setMaxEpoch(100)
val model = dlClassifier.fit(trainingData)
println("\ninitial model training finished.")


[Stage 37:>                                                         (0 + 1) / 2]
initial model training finished.

Now we have finished the training of our first model (which is certainly not the best, keep reading!).

We'll need to think about how do evaluate the trained model:

Given the class imbalance ratio, we recommend measuring the accuracy using the Area Under the Precision-Recall Curve (AUPRC). Confusion matrix accuracy is not meaningful for unbalanced classification. Since even if the model predicts all the records as normal transactions, it will still get an accuracy above 99%.


In [6]:
val predictionDF = model.transform(validData)

// convert the prediction and label column back to {0, 1}
def evaluateModel(predictionDF: DataFrame): Unit = {
  predictionDF.cache()
  val labelConverter2 = new FuncTransformer(udf {d: Double => if (d==2) 0 else d }).setInputCol("Class").setOutputCol("Class")
  val labelConverter3 = new FuncTransformer(udf {d: Double => if (d==2) 0 else d }).setInputCol("prediction").setOutputCol("prediction")
  val finalData = labelConverter2.transform(labelConverter3.transform(predictionDF))
    
  val metrics = new BinaryClassificationEvaluator().setRawPredictionCol("prediction").setLabelCol("Class")
  val auPRC = metrics.evaluate(finalData)
  println("\nArea under precision-recall curve: = " + auPRC)
    
  val recall = new MulticlassClassificationEvaluator().setLabelCol("Class").setMetricName("weightedRecall").evaluate(finalData)
  println("\nrecall = " + recall)

  val precisoin = new MulticlassClassificationEvaluator().setLabelCol("Class").setMetricName("weightedPrecision").evaluate(finalData)
  println("\nPrecision = " + precisoin)  
  predictionDF.unpersist()
}

evaluateModel(predictionDF)


[Stage 8051:============================>                           (1 + 1) / 2]
Area under precision-recall curve: = 0.7129279174622213

recall = 0.9992066362543897

Precision = 0.9991317216500658

To this point, we have finished the training and evaluation with a simple BigDL model. We can see that even though the recall and precision are high, the area under precision-recall curve is not optimistic. That's because we haven't really apply any technique to handle the imbalanced training data.

Next we'll try to optimize the training process.

step 4. handle the data imbalance

There are several ways to approach this classification problem taking into consideration this unbalance.

  1. Collect more data? Nice strategy but not applicable in this case.

  2. Resampling the dataset Essentially this is a method that will process the data to have an approximate 50-50 ratio. One way to achieve this is by OVER-sampling, which is adding copies of the under-represented class (better when there're little data) Another is UNDER-sampling, which deletes instances from the over-represented class (better when there are lots of data)

  3. Apart from under and over sampling, there is a very popular approach called SMOTE (Synthetic Minority Over-Sampling Technique), which is a combination of oversampling and undersampling, but the oversampling approach is not by replicating minority class but constructing new minority class data instance via an algorithm.

We'll start with Resampling.

Since there're 492 frauds out of 284,807 transactions, to build a reasonable training dataset, we'll use UNDER-sampling for normal transactions and use OVER-sampling for fraud transactions. By using the sampling rate as fraud -> 10, normal -> 0.05, we can get a training dataset of (5K fraud + 14K normal) transactions. We can use the training data to fit a model.

Yet we'll soon find that since there're only 5% of all the normal transactions are included in the training data, the model can only cover 5% of all the normal transactions, which is obviousely not optimistic. So how can we get a better converage for the normal transactions without breaking the ideal ratio in the training dataset?

An immediate improvement would be to train multiple models. For each model, we will run the resampling from the original dataset and get a new training data set. After training, we can select best voting strategy for all the models to make the prediction.

We'll use Ensembling of neural networks.


In [8]:
// Train with Bagging model, takes 5-10 minutes
    val numModel = 10
    val models = new Array[DLModel[Double]](numModel)
    for (i <- 0 until numModel) {
      val sampler = new StratifiedSampler(Map(2 -> 0.05, 1-> 10, 0 -> 1)).setLabel("Class")
      val bootstrapSample = sampler.transform(trainingData)
      val singleModel = new DLClassifier(bigDLModel.cloneModule(), criterion.cloneCriterion(), Array(29))
        .setLabelCol("Class")
        .setBatchSize(10000)
        .setLearningRate(3e-2)
        .setMaxEpoch(100)
        .fit(bootstrapSample)
      models(i) = singleModel
      println("model trained: " + i)
    }


model trained: 0
model trained: 1
model trained: 2
model trained: 3
model trained: 4
model trained: 5
model trained: 6
model trained: 7
model trained: 8
model trained: 9

After fitting, we can tune the voting strategy via threshold. When using Threshold = 15, we can get an improved model. Feel free to try with other thresholds.


In [9]:
val predicts = models.map { m =>
      m.transform(validData).select("prediction").as[Double].rdd
    }
    val aggPredict: RDD[Double] = predicts.reduce { (rdd1: RDD[Double], rdd2: RDD[Double]) =>
      val result = rdd1.zip(rdd2).map { case (p1, p2) => p1 + p2 }
      result
    }

    val threshold = 15
    val rows = validData.toDF().rdd.zip(aggPredict).map { case (row, p) =>
      val q = if(p >= threshold) 0.0 else 1.0
      Row.fromSeq(row.toSeq ++ Seq(q))
    }

    val predictDF = validData.sparkSession.createDataFrame(rows, validData.schema.add(StructField("prediction", DoubleType)))
    evaluateModel(predictDF)


[Stage 24352:===========================>                           (1 + 1) / 2]
Area under precision-recall curve: = 0.9257134355363974

recall = 0.9901529558633081

Precision = 0.9986903630199001

By using bagging model, we can increase the AUPRC from 0.71 to 0.93.

The code used in this notebook can be found from https://github.com/intel-analytics/analytics-zoo/tree/legacy/pipeline/fraudDetection.