Introduction to Elasticsearch and Spark SQL via Pyspark


Goals:

  • Practice Spark SQL via PySpark skills
  • Ensure JupyterLab Server, Spark Cluster & Elasticsearch are communicating
  • Learn to read from HELK elasticsearch indices

Import SparkSession Class


In [1]:
from pyspark.sql import SparkSession

Create a SparkSession instance

  • Define a spark variable
  • Pass values to the appName and master functions
    • For the master function, we are going to use the HELK's Spark Master container (helk-spark-master)
  • This time add the config() function to set Elasticsearch information needed to read from it

config(key=None, value=None, conf=None)

  • Sets a config option.
  • Options set using this method are automatically propagated to both SparkConf and SparkSession’s own configuration.

In [2]:
spark = SparkSession.builder \
    .appName("HELK Reader") \
    .master("spark://helk-spark-master:7077") \
    .enableHiveSupport() \
    .getOrCreate()

Check the SparkSession variable


In [3]:
spark


Out[3]:

SparkSession - hive

SparkContext

Spark UI

Version
v2.4.3
Master
spark://helk-spark-master:7077
AppName
HELK Reader

Read data from the HELK Elasticsearch via Spark SQL

Using the Dataframe API to access Elasticsearch index (Elasticsearch-Sysmon Index)

  • As we know, Spark SQL is a Spark module for structured data processing, and provides a programming abstraction called DataFrames and can also act as distributed SQL query engine.
  • Elasticsearch becomes a native source for Spark SQL so that data can be indexed and queried from Spark SQL transparently
  • Spark SQL works with structured data - in other words, all entries are expected to have the same structure (same number of fields, of the same type and name)
  • Using unstructured data (documents with different structures) is not supported and will cause problems.
  • Through the org.elasticsearch.spark.sql package, esDF methods are available on the SQLContext API

Reference: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html


In [4]:
es_reader = (spark.read
    .format("org.elasticsearch.spark.sql")
    .option("inferSchema", "true")
    .option("es.read.field.as.array.include", "tags")
    .option("es.nodes","helk-elasticsearch:9200")
    .option("es.net.http.auth.user","elastic")
)
    #PLEASE REMEMBER!!!!
    #If you are using elastic TRIAL license, then you need the es.net.http.auth.pass config option set
    #Example: .option("es.net.http.auth.pass","elasticpassword")

load(path=None, format=None, schema=None, options)**

  • Loads data from a data source and returns it as a :classDataFrame.

In [5]:
%%time
sysmon_df = es_reader.load("logs-endpoint-winevent-sysmon-*/")


CPU times: user 0 ns, sys: 4 ms, total: 4 ms
Wall time: 3.86 s

Filter Operation

Filter our the data to only show certain data fields and events with the action "processcreate" which is Sysmon Event ID 1


In [6]:
processcreate_df = sysmon_df.filter(sysmon_df.action == "processcreate")

Select Operation

You can select a few columns from your dataframe with the select method.


In [7]:
processcreate_df = processcreate_df.select("process_guid","process_parent_name","process_parent_command_line","process_name","process_command_line","action","@timestamp")

In [8]:
%%time
processcreate_df.show(10)


+--------------------+-------------------+---------------------------+--------------------+--------------------+-------------+--------------------+
|        process_guid|process_parent_name|process_parent_command_line|        process_name|process_command_line|       action|          @timestamp|
+--------------------+-------------------+---------------------------+--------------------+--------------------+-------------+--------------------+
|aa6b4a20-7cd9-5ce...|        svchost.exe|       c:\windows\system...|backgroundtaskhos...|"c:\windows\syste...|processcreate|2019-05-18 21:44:...|
|aa6b4a20-7cdf-5ce...|     powershell.exe|       c:\windows\system...|         conhost.exe|\??\c:\windows\sy...|processcreate|2019-05-18 21:45:...|
|aa6b4a20-7d15-5ce...|        svchost.exe|       c:\windows\system...|backgroundtaskhos...|"c:\windows\syste...|processcreate|2019-05-18 21:45:...|
|aa6b4a20-7d16-5ce...|        svchost.exe|       c:\windows\system...|   runtimebroker.exe|c:\windows\system...|processcreate|2019-05-18 21:45:...|
|03ba39f5-7d20-5ce...|        svchost.exe|       c:\windows\system...|        gpupdate.exe|gpupdate.exe /tar...|processcreate|2019-05-18 21:46:...|
|03ba39f5-7d20-5ce...|       gpupdate.exe|       gpupdate.exe /tar...|         conhost.exe|\??\c:\windows\sy...|processcreate|2019-05-18 21:46:...|
|03ba39f5-7d20-5ce...|       services.exe|       c:\windows\system...|         svchost.exe|c:\windows\system...|processcreate|2019-05-18 21:46:...|
|aa6b4a20-7ce9-5ce...|        svchost.exe|       c:\windows\system...|       taskhostw.exe|taskhostw.exe key...|processcreate|2019-05-18 21:45:...|
|aa6b4a20-7cea-5ce...|        svchost.exe|       c:\windows\system...|         dllhost.exe|c:\windows\system...|processcreate|2019-05-18 21:45:...|
|aa6b4a20-7cea-5ce...|        svchost.exe|       c:\windows\system...|       taskhostw.exe|       taskhostw.exe|processcreate|2019-05-18 21:45:...|
+--------------------+-------------------+---------------------------+--------------------+--------------------+-------------+--------------------+
only showing top 10 rows

CPU times: user 0 ns, sys: 4 ms, total: 4 ms
Wall time: 14.6 s

Create Dataframes from the original Sysmon Dataframe

  • Filter the original sysmon_df dataframe
  • Select specific columns
  • display results

NetworkConnect Events

We are going to use the network events logged by Sysmon (Event ID 3)


In [9]:
networkconnect_df = sysmon_df.filter(sysmon_df.action == "networkconnect")

In [10]:
networkconnect_df = networkconnect_df.select("process_guid","dst_ip_addr","dst_port","dst_host_name","action","@timestamp")

In [11]:
networkconnect_df.show(10,truncate=False)


+------------------------------------+-------------+--------+-------------+--------------+-----------------------+
|process_guid                        |dst_ip_addr  |dst_port|dst_host_name|action        |@timestamp             |
+------------------------------------+-------------+--------+-------------+--------------+-----------------------+
|03ba39f5-50b2-5ce0-0000-00109995c501|10.0.10.106  |443     |null         |networkconnect|2019-05-18 21:44:43.063|
|aa6b4a20-7b8d-5ce0-0000-001028031c00|10.0.10.106  |443     |null         |networkconnect|2019-05-18 21:44:51.333|
|905CC552-2045-5CC5-0000-00105B2A0100|172.18.39.102|5985    |null         |networkconnect|2019-05-18 21:44:53.257|
|03ba39f5-652c-5ce0-0000-0010760bff01|10.0.10.106  |443     |null         |networkconnect|2019-05-18 21:44:53.484|
|03ba39f5-6e79-5ce0-0000-001032d21002|10.0.10.106  |443     |null         |networkconnect|2019-05-18 21:44:58.094|
|03ba39f5-50b2-5ce0-0000-00109995c501|10.0.10.106  |443     |null         |networkconnect|2019-05-18 21:44:58.609|
|03ba39f5-652c-5ce0-0000-0010760bff01|10.0.10.106  |443     |null         |networkconnect|2019-05-18 21:44:58.609|
|03ba39f5-ea63-5ccb-0000-001050e60000|172.18.39.105|135     |it001        |networkconnect|2019-05-18 21:45:03.297|
|03ba39f5-6e79-5ce0-0000-001032d21002|10.0.10.106  |443     |null         |networkconnect|2019-05-18 21:45:03.562|
|03ba39f5-652c-5ce0-0000-0010760bff01|10.0.10.106  |443     |null         |networkconnect|2019-05-18 21:45:03.812|
+------------------------------------+-------------+--------+-------------+--------------+-----------------------+
only showing top 10 rows

FileCreate Event


In [12]:
filecreate_df = sysmon_df.filter(sysmon_df.action == "filecreate")

In [13]:
filecreate_df = filecreate_df.select("process_guid","file_name","action","@timestamp")

In [14]:
%%time
filecreate_df.show(10,truncate=False)


+------------------------------------+----------------------------------------------------------------------------------------------+----------+-----------------------+
|process_guid                        |file_name                                                                                     |action    |@timestamp             |
+------------------------------------+----------------------------------------------------------------------------------------------+----------+-----------------------+
|aa6b4a20-7cde-5ce0-0000-00109ea71e00|c:\users\pgustavo\appdata\local\temp\__psscriptpolicytest_kld4kxox.voz.ps1                    |filecreate|2019-05-18 21:45:04.958|
|aa6b4a20-7cde-5ce0-0000-00109ea71e00|c:\users\pgustavo\appdata\local\temp\__psscriptpolicytest_4ksn3cia.csg.psm1                   |filecreate|2019-05-18 21:45:04.958|
|905CC552-2042-5CC5-0000-00103D150100|c:\windows\serviceprofiles\localservice\appdata\local\lastalive1.dat                          |filecreate|2019-05-18 21:45:11.649|
|aa6b4a20-7719-5ce0-0000-001068a30000|c:\windows\temp\his33a6.tmp                                                                   |filecreate|2019-05-18 21:45:11.796|
|aa6b4a20-7735-5ce0-0000-001033f10100|c:\windows\system32\sleepstudy\screenon\screenonpowerstudytracesession-2019-05-18-17-45-11.etl|filecreate|2019-05-18 21:45:11.99 |
|aa6b4a20-7719-5ce0-0000-001068a30000|c:\windows\temp\hisf7a6.tmp                                                                   |filecreate|2019-05-18 21:44:56.433|
|aa6b4a20-7cde-5ce0-0000-00109ea71e00|c:\users\pgustavo\documents\20190518\powershell_transcript.it001.rhzmf_up.20190518174505.txt  |filecreate|2019-05-18 21:45:05.573|
|aa6b4a20-771f-5ce0-0000-00108a420100|c:\windows\prefetch\powershell.exe-920bba2a.pf                                                |filecreate|2019-05-18 21:45:15.754|
|aa6b4a20-771f-5ce0-0000-00108a420100|c:\windows\prefetch\dllhost.exe-d8e67ed6.pf                                                   |filecreate|2019-05-18 21:45:26.147|
|aa6b4a20-771f-5ce0-0000-00108a420100|c:\windows\prefetch\wmiprvse.exe-1628051c.pf                                                  |filecreate|2019-05-18 21:45:15.648|
+------------------------------------+----------------------------------------------------------------------------------------------+----------+-----------------------+
only showing top 10 rows

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 507 ms

Spark SQL JOINs & Sysmon Data

join(other, on=None, how=None)

Joins with another DataFrame, using the given join expression.

Parameters:

  • other – Right side of the join
  • on – a string for the join column name, a list of column names, a join expression (Column), or a list of Columns. If on is a string or a list of strings indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an equi-join.
  • how – str, default inner. Must be one of: inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti.

ProcessCreate -> NetworkCreate


In [15]:
process_network_df = processcreate_df.join(networkconnect_df, "process_guid", how="inner")

In [16]:
%%time
process_network_df.select("process_parent_name","process_name","dst_ip_addr").show(truncate=False)


+-------------------+-------------------+--------------+
|process_parent_name|process_name       |dst_ip_addr   |
+-------------------+-------------------+--------------+
|svchost.exe        |microsoftedgecp.exe|13.107.21.200 |
|svchost.exe        |microsoftedgecp.exe|13.107.21.200 |
|svchost.exe        |microsoftedgecp.exe|204.79.197.200|
|svchost.exe        |microsoftedgecp.exe|72.30.2.182   |
|svchost.exe        |microsoftedgecp.exe|204.79.197.203|
|svchost.exe        |microsoftedgecp.exe|23.50.228.129 |
|svchost.exe        |microsoftedgecp.exe|23.194.130.152|
|svchost.exe        |microsoftedgecp.exe|23.194.130.145|
|wmiprvse.exe       |powershell.exe     |10.0.10.106   |
|wmiprvse.exe       |powershell.exe     |10.0.10.106   |
|wmiprvse.exe       |powershell.exe     |10.0.10.106   |
|wmiprvse.exe       |powershell.exe     |10.0.10.106   |
|wmiprvse.exe       |powershell.exe     |10.0.10.106   |
|wmiprvse.exe       |powershell.exe     |10.0.10.106   |
|wmiprvse.exe       |powershell.exe     |10.0.10.106   |
|wmiprvse.exe       |powershell.exe     |10.0.10.106   |
|wmiprvse.exe       |powershell.exe     |10.0.10.106   |
|wmiprvse.exe       |powershell.exe     |10.0.10.106   |
|wmiprvse.exe       |powershell.exe     |10.0.10.106   |
|wmiprvse.exe       |powershell.exe     |10.0.10.106   |
+-------------------+-------------------+--------------+
only showing top 20 rows

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 5.05 s

In [17]:
%%time
process_network_df.groupBy('process_parent_name').count().sort('count', ascending=False).show()


+-------------------+-----+
|process_parent_name|count|
+-------------------+-----+
|        wscript.exe|   49|
|       wmiprvse.exe|   25|
|        svchost.exe|    9|
|       services.exe|    2|
+-------------------+-----+

CPU times: user 32 ms, sys: 80 ms, total: 112 ms
Wall time: 18.2 s

In [21]:
%%time
(process_network_df
            .filter(process_network_df
            .process_parent_name=="svchost.exe")
            .select("process_parent_command_line","process_name","dst_ip_addr")
            .show(5,truncate=False)
)


+------------------------------------------------+-------------------+--------------+
|process_parent_command_line                     |process_name       |dst_ip_addr   |
+------------------------------------------------+-------------------+--------------+
|c:\windows\system32\svchost.exe -k dcomlaunch -p|microsoftedgecp.exe|13.107.21.200 |
|c:\windows\system32\svchost.exe -k dcomlaunch -p|microsoftedgecp.exe|13.107.21.200 |
|c:\windows\system32\svchost.exe -k dcomlaunch -p|microsoftedgecp.exe|204.79.197.200|
|c:\windows\system32\svchost.exe -k dcomlaunch -p|microsoftedgecp.exe|72.30.2.182   |
|c:\windows\system32\svchost.exe -k dcomlaunch -p|microsoftedgecp.exe|204.79.197.203|
+------------------------------------------------+-------------------+--------------+
only showing top 5 rows

CPU times: user 12 ms, sys: 32 ms, total: 44 ms
Wall time: 14.1 s

ProcessCreate -> FileCreate

Let's focus now on the least frequent events


In [22]:
process_file_df = processcreate_df.join(filecreate_df, "process_guid", how="inner")

In [23]:
%%time
process_file_df.groupBy('process_parent_name').count().sort('count').show()


+-------------------+-----+
|process_parent_name|count|
+-------------------+-----+
|       wmiprvse.exe|    3|
|        wscript.exe|    4|
|        svchost.exe|    5|
+-------------------+-----+

CPU times: user 0 ns, sys: 220 ms, total: 220 ms
Wall time: 24.6 s

In [ ]: