Introduction to Elasticsearch and Spark SQL via Pyspark


  • Practice Spark SQL via PySpark skills
  • Ensure JupyterLab Server, Spark Cluster & Elasticsearch are communicating
  • Learn to read from HELK elasticsearch indices

Import SparkSession Class

from pyspark.sql import SparkSession

Create a SparkSession instance

  • Define a spark variable
  • Pass values to the appName and master functions
    • For the master function, we are going to use the HELK's Spark Master container (helk-spark-master)
  • This time add the config() function to set Elasticsearch information needed to read from it

config(key=None, value=None, conf=None)

  • Sets a config option.
  • Options set using this method are automatically propagated to both SparkConf and SparkSession’s own configuration.

spark = SparkSession.builder \
    .appName("HELK Reader") \
    .master("spark://helk-spark-master:7077") \
    .enableHiveSupport() \

Check the SparkSession variable

SparkSession - hive


Spark UI

HELK Reader

Read data from the HELK Elasticsearch via Spark SQL

Using the Dataframe API to access Elasticsearch index (Elasticsearch-Sysmon Index)

  • As we know, Spark SQL is a Spark module for structured data processing, and provides a programming abstraction called DataFrames and can also act as distributed SQL query engine.
  • Elasticsearch becomes a native source for Spark SQL so that data can be indexed and queried from Spark SQL transparently
  • Spark SQL works with structured data - in other words, all entries are expected to have the same structure (same number of fields, of the same type and name)
  • Using unstructured data (documents with different structures) is not supported and will cause problems.
  • Through the org.elasticsearch.spark.sql package, esDF methods are available on the SQLContext API


es_reader = (
    .option("inferSchema", "true")
    .option("", "tags")
    #If you are using elastic TRIAL license, then you need the config option set
    #Example: .option("","elasticpassword")

load(path=None, format=None, schema=None, options)**

  • Loads data from a data source and returns it as a :classDataFrame.

sysmon_df = es_reader.load("logs-endpoint-winevent-sysmon-*/")

Filter Operation

Filter our the data to only show certain data fields and events with the action "processcreate" which is Sysmon Event ID 1

processcreate_df = sysmon_df.filter(sysmon_df.action == "processcreate")

Select Operation

You can select a few columns from your dataframe with the select method.

processcreate_df ="process_guid","process_parent_name","process_parent_command_line","process_name","process_command_line","action","@timestamp")

|        process_guid|process_parent_name|process_parent_command_line|        process_name|process_command_line|       action|          @timestamp|
|aa6b4a20-7cd9-5ce...|        svchost.exe|       c:\windows\system...|backgroundtaskhos...|"c:\windows\syste...|processcreate|2019-05-18 21:44:...|
|aa6b4a20-7cdf-5ce...|     powershell.exe|       c:\windows\system...|         conhost.exe|\??\c:\windows\sy...|processcreate|2019-05-18 21:45:...|
|aa6b4a20-7d15-5ce...|        svchost.exe|       c:\windows\system...|backgroundtaskhos...|"c:\windows\syste...|processcreate|2019-05-18 21:45:...|
|aa6b4a20-7d16-5ce...|        svchost.exe|       c:\windows\system...|   runtimebroker.exe|c:\windows\system...|processcreate|2019-05-18 21:45:...|
|03ba39f5-7d20-5ce...|        svchost.exe|       c:\windows\system...|        gpupdate.exe|gpupdate.exe /tar...|processcreate|2019-05-18 21:46:...|
|03ba39f5-7d20-5ce...|       gpupdate.exe|       gpupdate.exe /tar...|         conhost.exe|\??\c:\windows\sy...|processcreate|2019-05-18 21:46:...|
|03ba39f5-7d20-5ce...|       services.exe|       c:\windows\system...|         svchost.exe|c:\windows\system...|processcreate|2019-05-18 21:46:...|
|aa6b4a20-7ce9-5ce...|        svchost.exe|       c:\windows\system...|       taskhostw.exe|taskhostw.exe key...|processcreate|2019-05-18 21:45:...|
|aa6b4a20-7cea-5ce...|        svchost.exe|       c:\windows\system...|         dllhost.exe|c:\windows\system...|processcreate|2019-05-18 21:45:...|
|aa6b4a20-7cea-5ce...|        svchost.exe|       c:\windows\system...|       taskhostw.exe|       taskhostw.exe|processcreate|2019-05-18 21:45:...|
only showing top 10 rows

Create Dataframes from the original Sysmon Dataframe

  • Filter the original sysmon_df dataframe
  • Select specific columns
  • display results

NetworkConnect Events

We are going to use the network events logged by Sysmon (Event ID 3)

networkconnect_df = sysmon_df.filter(sysmon_df.action == "networkconnect")

networkconnect_df ="process_guid","dst_ip_addr","dst_port","dst_host_name","action","@timestamp")

|process_guid                        |dst_ip_addr  |dst_port|dst_host_name|action        |@timestamp             |
|03ba39f5-50b2-5ce0-0000-00109995c501|  |443     |null         |networkconnect|2019-05-18 21:44:43.063|
|aa6b4a20-7b8d-5ce0-0000-001028031c00|  |443     |null         |networkconnect|2019-05-18 21:44:51.333|
|905CC552-2045-5CC5-0000-00105B2A0100||5985    |null         |networkconnect|2019-05-18 21:44:53.257|
|03ba39f5-652c-5ce0-0000-0010760bff01|  |443     |null         |networkconnect|2019-05-18 21:44:53.484|
|03ba39f5-6e79-5ce0-0000-001032d21002|  |443     |null         |networkconnect|2019-05-18 21:44:58.094|
|03ba39f5-50b2-5ce0-0000-00109995c501|  |443     |null         |networkconnect|2019-05-18 21:44:58.609|
|03ba39f5-652c-5ce0-0000-0010760bff01|  |443     |null         |networkconnect|2019-05-18 21:44:58.609|
|03ba39f5-ea63-5ccb-0000-001050e60000||135     |it001        |networkconnect|2019-05-18 21:45:03.297|
|03ba39f5-6e79-5ce0-0000-001032d21002|  |443     |null         |networkconnect|2019-05-18 21:45:03.562|
|03ba39f5-652c-5ce0-0000-0010760bff01|  |443     |null         |networkconnect|2019-05-18 21:45:03.812|
only showing top 10 rows

FileCreate Event

filecreate_df = sysmon_df.filter(sysmon_df.action == "filecreate")

filecreate_df ="process_guid","file_name","action","@timestamp")

|process_guid                        |file_name                                                                                     |action    |@timestamp             |
|aa6b4a20-7cde-5ce0-0000-00109ea71e00|c:\users\pgustavo\appdata\local\temp\__psscriptpolicytest_kld4kxox.voz.ps1                    |filecreate|2019-05-18 21:45:04.958|
|aa6b4a20-7cde-5ce0-0000-00109ea71e00|c:\users\pgustavo\appdata\local\temp\__psscriptpolicytest_4ksn3cia.csg.psm1                   |filecreate|2019-05-18 21:45:04.958|
|905CC552-2042-5CC5-0000-00103D150100|c:\windows\serviceprofiles\localservice\appdata\local\lastalive1.dat                          |filecreate|2019-05-18 21:45:11.649|
|aa6b4a20-7719-5ce0-0000-001068a30000|c:\windows\temp\his33a6.tmp                                                                   |filecreate|2019-05-18 21:45:11.796|
|aa6b4a20-7735-5ce0-0000-001033f10100|c:\windows\system32\sleepstudy\screenon\screenonpowerstudytracesession-2019-05-18-17-45-11.etl|filecreate|2019-05-18 21:45:11.99 |
|aa6b4a20-7719-5ce0-0000-001068a30000|c:\windows\temp\hisf7a6.tmp                                                                   |filecreate|2019-05-18 21:44:56.433|
|aa6b4a20-7cde-5ce0-0000-00109ea71e00|c:\users\pgustavo\documents\20190518\powershell_transcript.it001.rhzmf_up.20190518174505.txt  |filecreate|2019-05-18 21:45:05.573|
|aa6b4a20-771f-5ce0-0000-00108a420100|c:\windows\prefetch\                                                |filecreate|2019-05-18 21:45:15.754|
|aa6b4a20-771f-5ce0-0000-00108a420100|c:\windows\prefetch\                                                   |filecreate|2019-05-18 21:45:26.147|
|aa6b4a20-771f-5ce0-0000-00108a420100|c:\windows\prefetch\                                                  |filecreate|2019-05-18 21:45:15.648|
only showing top 10 rows

Spark SQL JOINs & Sysmon Data

join(other, on=None, how=None)

Joins with another DataFrame, using the given join expression.


  • other – Right side of the join
  • on – a string for the join column name, a list of column names, a join expression (Column), or a list of Columns. If on is a string or a list of strings indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an equi-join.
  • how – str, default inner. Must be one of: inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti.

ProcessCreate -> NetworkCreate

process_network_df = processcreate_df.join(networkconnect_df, "process_guid", how="inner")

|process_parent_name|process_name       |dst_ip_addr   |
|svchost.exe        |microsoftedgecp.exe| |
|svchost.exe        |microsoftedgecp.exe| |
|svchost.exe        |microsoftedgecp.exe||
|svchost.exe        |microsoftedgecp.exe|   |
|svchost.exe        |microsoftedgecp.exe||
|svchost.exe        |microsoftedgecp.exe| |
|svchost.exe        |microsoftedgecp.exe||
|svchost.exe        |microsoftedgecp.exe||
|wmiprvse.exe       |powershell.exe     |   |
|wmiprvse.exe       |powershell.exe     |   |
|wmiprvse.exe       |powershell.exe     |   |
|wmiprvse.exe       |powershell.exe     |   |
|wmiprvse.exe       |powershell.exe     |   |
|wmiprvse.exe       |powershell.exe     |   |
|wmiprvse.exe       |powershell.exe     |   |
|wmiprvse.exe       |powershell.exe     |   |
|wmiprvse.exe       |powershell.exe     |   |
|wmiprvse.exe       |powershell.exe     |   |
|wmiprvse.exe       |powershell.exe     |   |
|wmiprvse.exe       |powershell.exe     |   |
only showing top 20 rows

process_network_df.groupBy('process_parent_name').count().sort('count', ascending=False).show()

|        wscript.exe|   49|
|       wmiprvse.exe|   25|
|        svchost.exe|    9|
|       services.exe|    2|

|process_parent_command_line                     |process_name       |dst_ip_addr   |
|c:\windows\system32\svchost.exe -k dcomlaunch -p|microsoftedgecp.exe| |
|c:\windows\system32\svchost.exe -k dcomlaunch -p|microsoftedgecp.exe| |
|c:\windows\system32\svchost.exe -k dcomlaunch -p|microsoftedgecp.exe||
|c:\windows\system32\svchost.exe -k dcomlaunch -p|microsoftedgecp.exe|   |
|c:\windows\system32\svchost.exe -k dcomlaunch -p|microsoftedgecp.exe||
only showing top 5 rows

ProcessCreate -> FileCreate

Let's focus now on the least frequent events

process_file_df = processcreate_df.join(filecreate_df, "process_guid", how="inner")

|       wmiprvse.exe|    3|
|        wscript.exe|    4|
|        svchost.exe|    5|

