Introduction to Spark Graphframes and Sysmon


Goals:

  • Get familiarized with the basics of Spark Graphframes
  • Confirm Jupyter can import Graphframes library
  • Confirm Spark can read data from Elasticsearch
  • Learn to create a graphframe from sysmon Index
  • Learn the basics of GraphFrames Motifs

What is Graphframes?

GraphFrames is a package for Apache Spark which provides DataFrame-based Graphs.

  • It provides high-level APIs in Scala, Java, and Python.
  • It aims to provide both the functionality of GraphX and extended functionality taking advantage of Spark DataFrames.
  • This extended functionality includes motif finding, DataFrame-based serialization, and highly expressive graph queries.

What is a graph?

GraphFrames represent graphs:

  • Vertices (e.g., users)
  • Edges (e.g., relationships between users).

Import SparkSession Class


In [1]:
from pyspark.sql import SparkSession

Create a SparkSession instance

  • Define a spark variable
  • Pass values to the appName and master functions
    • For the master function, we are going to use the HELK's Spark Master container (helk-spark-master)
  • This time add the config() function to set Elasticsearch information needed to read from it

config(key=None, value=None, conf=None)

  • Sets a config option.
  • Options set using this method are automatically propagated to both SparkConf and SparkSession’s own configuration.

In [2]:
spark = SparkSession.builder \
    .appName("HELK Graphs") \
    .master("spark://helk-spark-master:7077") \
    .enableHiveSupport() \
    .getOrCreate()

Check the SparkSession variable


In [3]:
spark


Out[3]:

SparkSession - hive

SparkContext

Spark UI

Version
v2.4.3
Master
spark://helk-spark-master:7077
AppName
HELK Graphs

Import Graphframes & SQL Functions


In [4]:
from graphframes import *
from pyspark.sql.functions import *

In [5]:
%%time
# Create a Vertex DataFrame with unique ID column "id"# Creat 
v = spark.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
], ["id", "name", "age"])
# Create an Edge DataFrame with "src" and "dst" columns
e = spark.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
], ["src", "dst", "relationship"])
# Create a GraphFrame
from graphframes import *
g = GraphFrame(v, e)

# Query: Get in-degree of each vertex.
g.inDegrees.show()

# Query: Count the number of "follow" connections in the graph.
g.edges.filter("relationship = 'follow'").count()


+---+--------+
| id|inDegree|
+---+--------+
|  c|       1|
|  b|       2|
+---+--------+

CPU times: user 348 ms, sys: 380 ms, total: 728 ms
Wall time: 31.6 s

Read data from the HELK Elasticsearch via Spark SQL

Using the Dataframe API to access Elasticsearch index (Elasticsearch-Sysmon Index)

  • As we know, Spark SQL is a Spark module for structured data processing, and provides a programming abstraction called DataFrames and can also act as distributed SQL query engine.
  • Elasticsearch becomes a native source for Spark SQL so that data can be indexed and queried from Spark SQL transparently
  • Spark SQL works with structured data - in other words, all entries are expected to have the same structure (same number of fields, of the same type and name)
  • Using unstructured data (documents with different structures) is not supported and will cause problems.
  • Through the org.elasticsearch.spark.sql package, esDF methods are available on the SQLContext API

Reference: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html


In [6]:
es_reader = (spark.read
    .format("org.elasticsearch.spark.sql")
    .option("inferSchema", "true")
    .option("es.read.field.as.array.include", "tags")
    .option("es.nodes","helk-elasticsearch:9200")
    .option("es.net.http.auth.user","elastic")
)
    #PLEASE REMEMBER!!!!
    #If you are using elastic TRIAL license, then you need the es.net.http.auth.pass config option set
    #Example: .option("es.net.http.auth.pass","elasticpassword")

load(path=None, format=None, schema=None, options)**

  • Loads data from a data source and returns it as a :classDataFrame.

In [7]:
%%time
sysmon_df = es_reader.load("logs-endpoint-winevent-sysmon-*/")


CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 1.58 s

ProcessCreate & Motifs

Create Vertices Dataframe

We are going to replace the column name from process_guid to id because thats the column name that Graphframes uses for the row IDs.

withColumn(colName, col)

  • Returns a new DataFrame by adding a column or replacing the existing column that has the same name.
  • The column expression must be an expression over this DataFrame; attempting to add a column from some other dataframe will raise an error.

In [8]:
vertices = (sysmon_df.withColumn("id", sysmon_df.process_guid)
            .select("id","user_name","host_name","process_parent_name","process_name","action")
           )
vertices = vertices.filter(vertices.action == "processcreate")

In [9]:
%%time
vertices.show(5,truncate=False)


+------------------------------------+---------------+---------------+-------------------+----------------------+-------------+
|id                                  |user_name      |host_name      |process_parent_name|process_name          |action       |
+------------------------------------+---------------+---------------+-------------------+----------------------+-------------+
|aa6b4a20-7cd9-5ce0-0000-0010a3801e00|pgustavo       |it001.shire.com|svchost.exe        |backgroundtaskhost.exe|processcreate|
|aa6b4a20-7cdf-5ce0-0000-00105eac1e00|pgustavo       |it001.shire.com|powershell.exe     |conhost.exe           |processcreate|
|aa6b4a20-7d15-5ce0-0000-0010f07f1f00|pgustavo       |it001.shire.com|svchost.exe        |backgroundtaskhost.exe|processcreate|
|aa6b4a20-7d16-5ce0-0000-001089921f00|pgustavo       |it001.shire.com|svchost.exe        |runtimebroker.exe     |processcreate|
|03ba39f5-7d20-5ce0-0000-001052da2002|network service|hr001.shire.com|svchost.exe        |gpupdate.exe          |processcreate|
+------------------------------------+---------------+---------------+-------------------+----------------------+-------------+
only showing top 5 rows

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 1.83 s

Create Edges Dataframe

We are going to make sure we also rename our process_parent_guid to src and process_guid to dst. This is to look for that relationship across our whole environment

selectExpr(*expr)

  • Projects a set of SQL expressions and returns a new DataFrame.
  • This is a variant of select() that accepts SQL expressions.
  • You can also combine selecting columns and renaming columns in a single step with selectExpr

In [10]:
edges = (sysmon_df
         .filter(sysmon_df.action == "processcreate")
         .selectExpr("process_parent_guid as src","process_guid as dst")
         .withColumn("relationship", lit("spawned"))
        )

In [11]:
%%time
edges.show(5,truncate=False)


+------------------------------------+------------------------------------+------------+
|src                                 |dst                                 |relationship|
+------------------------------------+------------------------------------+------------+
|aa6b4a20-7719-5ce0-0000-001068a30000|aa6b4a20-7cd9-5ce0-0000-0010a3801e00|spawned     |
|aa6b4a20-7cde-5ce0-0000-00109ea71e00|aa6b4a20-7cdf-5ce0-0000-00105eac1e00|spawned     |
|aa6b4a20-7719-5ce0-0000-001068a30000|aa6b4a20-7d15-5ce0-0000-0010f07f1f00|spawned     |
|aa6b4a20-7719-5ce0-0000-001068a30000|aa6b4a20-7d16-5ce0-0000-001089921f00|spawned     |
|03ba39f5-ea64-5ccb-0000-0010c91c0100|03ba39f5-7d20-5ce0-0000-001052da2002|spawned     |
+------------------------------------+------------------------------------+------------+
only showing top 5 rows

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 453 ms

Create a Graph (Vertices & Edges DataFrames)


In [12]:
g = GraphFrame(vertices, edges)

Process A spawning Process B AND Process B Spawning Process C


In [13]:
%%time
motifs = g.find("(a)-[]->(b);(b)-[]->(c)")


CPU times: user 0 ns, sys: 4 ms, total: 4 ms
Wall time: 405 ms

In [14]:
%%time
(motifs
     .select("a.process_parent_name","a.process_name","b.process_name","c.process_name")
     .show(20,truncate=False)
)


+-------------------+------------+--------------+------------+
|process_parent_name|process_name|process_name  |process_name|
+-------------------+------------+--------------+------------+
|svchost.exe        |wmiprvse.exe|powershell.exe|conhost.exe |
|svchost.exe        |wmiprvse.exe|powershell.exe|whoami.exe  |
|explorer.exe       |wscript.exe |powershell.exe|conhost.exe |
+-------------------+------------+--------------+------------+

CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 15.3 s

In [15]:
%%time
motifs.groupby('a.process_parent_name').count().sort('count').show(10)


+-------------------+-----+
|process_parent_name|count|
+-------------------+-----+
|       explorer.exe|    1|
|        svchost.exe|    2|
+-------------------+-----+

CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 13.8 s