In [1]:
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql import Row
import operator
from pyspark.mllib.clustering import KMeans

In [2]:
spark = SparkSession.builder \
        .master("local") \
        .appName("Anomalies Detection") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()

sparkCt = spark.sparkContext

In [9]:
def to_onehot(lst, indices, unique_values, c):
    zs = [0.0]*c
    rest_lst = [float(lst[k]) for k in range(len(lst)) if k not in indices]
    for pos in indices:
        idx = unique_values.index(Row(lst[pos]))
        zs[idx] = 1.0
    zs.extend(rest_lst)
    return zs

class AnomalyDetection():

    def readData(self, filename):
        self.rawDF = spark.read.parquet(filename).cache()


    def cat2Num(self, df, indices):
        unique_values = []
        for i in indices:
            d = udf(lambda r: r[i], StringType())
            dt = df.select(d(df.rawFeatures)).distinct().collect()
            unique_values.extend(dt)

        unique_count = len(unique_values)
        convertUDF = udf(lambda r: to_onehot(r, indices, unique_values, unique_count), ArrayType(DoubleType()))
        newdf = df.withColumn("features", convertUDF(df.rawFeatures))

        return newdf


    def addScore(self, df):
        cluster_dict = {}
        clusters_list = df.select("prediction").collect()
        for c in clusters_list:
            cluster_dict[c] = cluster_dict.setdefault(c,0.0)+1.0
        sorted_clusters = sorted(cluster_dict.items(), key=operator.itemgetter(1))  # sort by value
        n_max = sorted_clusters[-1][1]
        n_min = sorted_clusters[0][1]
        score_udf = udf(lambda p: float(n_max - cluster_dict.get(Row(p)))/(n_max - n_min), DoubleType())
        score_df = df.withColumn("score", score_udf(df.prediction))
        return score_df


    def detect(self, k, t):
        # Encoding categorical features using one-hot.
        df1 = self.cat2Num(self.rawDF, [0, 1]).cache()

        # Clustering points using KMeans
        features = df1.select("features").rdd.map(lambda row: row[0]).cache()
        model = KMeans.train(features, k, maxIterations=40, initializationMode="random", seed=20)

        # Adding the prediction column to df1
        modelBC = sparkCt.broadcast(model)
        predictUDF = udf(lambda x: modelBC.value.predict(x), StringType())
        df2 = df1.withColumn("prediction", predictUDF(df1.features)).cache()

        # Adding the score column to df2; The higher the score, the more likely it is an anomaly
        df3 = self.addScore(df2).cache()

        return df3.where(df3.score > t)

In [19]:
import os
from mlflow import log_metric, log_param, log_artifact

def main():
    ad = AnomalyDetection()
    inputs = "data/logs-features-sample/"
    ad.readData(inputs)
    anomalies = ad.detect(8, 0.97)
    
    log_param("output_records", anomalies.count())
    log_param("output_columns", len(anomalies.columns))
    
    anomalies.show()
    
    with open("mlflow_output.txt", "w") as f:
        f.write("Finish running!")
    log_artifact("mlflow_output.txt")

if __name__ == "__main__":
    main()


+-----+--------------------+--------------------+----------+-----+
|   id|         rawFeatures|            features|prediction|score|
+-----+--------------------+--------------------+----------+-----+
|44362|[tcp, SF, -0.1585...|[1.0, 0.0, 0.0, 0...|         7|  1.0|
|44432|[tcp, SF, -0.1585...|[1.0, 0.0, 0.0, 0...|         7|  1.0|
|44776|[tcp, SF, -0.1585...|[1.0, 0.0, 0.0, 0...|         7|  1.0|
|44928|[tcp, SF, -0.1585...|[1.0, 0.0, 0.0, 0...|         7|  1.0|
|45317|[tcp, SF, -0.1578...|[1.0, 0.0, 0.0, 0...|         7|  1.0|
|45323|[tcp, SF, -0.1585...|[1.0, 0.0, 0.0, 0...|         7|  1.0|
|45669|[tcp, SF, -0.1585...|[1.0, 0.0, 0.0, 0...|         7|  1.0|
|45676|[tcp, SF, -0.1585...|[1.0, 0.0, 0.0, 0...|         7|  1.0|
|45986|[tcp, SF, -0.1585...|[1.0, 0.0, 0.0, 0...|         7|  1.0|
|46341|[tcp, SF, -0.1585...|[1.0, 0.0, 0.0, 0...|         7|  1.0|
|46376|[tcp, SF, -0.1585...|[1.0, 0.0, 0.0, 0...|         7|  1.0|
|46391|[tcp, SF, -0.1585...|[1.0, 0.0, 0.0, 0...|         7|  1.0|
|46547|[tcp, SF, -0.1585...|[1.0, 0.0, 0.0, 0...|         7|  1.0|
|46872|[tcp, SF, -0.1585...|[1.0, 0.0, 0.0, 0...|         7|  1.0|
|47450|[tcp, SF, -0.1585...|[1.0, 0.0, 0.0, 0...|         7|  1.0|
|47951|[tcp, SF, -0.1585...|[1.0, 0.0, 0.0, 0...|         7|  1.0|
|48194|[tcp, SF, -0.1585...|[1.0, 0.0, 0.0, 0...|         7|  1.0|
|48281|[tcp, SF, -0.1585...|[1.0, 0.0, 0.0, 0...|         7|  1.0|
|48569|[tcp, SF, 0.47745...|[1.0, 0.0, 0.0, 0...|         7|  1.0|
|48883|[tcp, SF, -0.1585...|[1.0, 0.0, 0.0, 0...|         7|  1.0|
+-----+--------------------+--------------------+----------+-----+
only showing top 20 rows

NOTES

  • param, metrics will all be written in to folder "mlruns"