Spark SQL Tables via Pyspark


Goals:

  • Practice Spark SQL via PySpark skills
  • Ensure JupyterLab Server, Spark Cluster & Elasticsearch are communicating
  • Practice Query execution via Pyspark
  • Create template for future queries

Import SparkSession Class


In [1]:
from pyspark.sql import SparkSession

Create a SparkSession instance


In [2]:
spark = SparkSession.builder \
    .appName("HELK Reader") \
    .master("spark://helk-spark-master:7077") \
    .enableHiveSupport() \
    .getOrCreate()

Read data from the HELK Elasticsearch via Spark SQL


In [3]:
es_reader = (spark.read
    .format("org.elasticsearch.spark.sql")
    .option("inferSchema", "true")
    .option("es.read.field.as.array.include", "tags")
    .option("es.nodes","helk-elasticsearch:9200")
    .option("es.net.http.auth.user","elastic")
)
    #PLEASE REMEMBER!!!!
    #If you are using elastic TRIAL license, then you need the es.net.http.auth.pass config option set
    #Example: .option("es.net.http.auth.pass","elasticpassword")

Read Sysmon Events


In [4]:
%%time
sysmon_df = es_reader.load("logs-endpoint-winevent-sysmon-*/")


CPU times: user 0 ns, sys: 4 ms, total: 4 ms
Wall time: 1.99 s

Register Sysmon SQL temporary View


In [5]:
sysmon_df.createOrReplaceTempView("sysmon_events")

In [6]:
## Run SQL Queries

In [ ]:
sysmon_ps_execution = spark.sql(
    '''
    SELECT event_id,process_parent_name,process_name
    FROM sysmon_events
    WHERE event_id = 1
        AND process_name = "powershell.exe"
        AND NOT process_parent_name = "explorer.exe"
    '''
)
sysmon_ps_execution.show(10)

In [ ]:
sysmon_ps_module = spark.sql(
    '''
    SELECT event_id,process_name
    FROM sysmon_events
    WHERE event_id = 7 
        AND (
            lower(file_description) = "system.management.automation"
            OR lower(module_loaded) LIKE "%\\\\system.management.automation%"
        ) 
    '''
)
sysmon_ps_module.show(10)

In [ ]:
sysmon_ps_pipe = spark.sql(
    '''
    SELECT event_id,process_name
    FROM sysmon_events
    WHERE event_id = 17
        AND lower(pipe_name) LIKE "\\\\pshost%"
    '''
)
sysmon_ps_pipe.show(10)

Read PowerShell Events


In [ ]:
%%time
powershell_df = es_reader.load("logs-endpoint-winevent-powershell-*/")

Register PowerShell SQL temporary View


In [ ]:
powershell_df.createOrReplaceTempView("powershell_events")

In [ ]:
ps_named_pipe = spark.sql(
    '''
    SELECT event_id
    FROM powershell_events
    WHERE event_id = 53504
    '''
)
ps_named_pipe.show(10)

In [ ]: