In [1]:
from pyspark.sql import SparkSession
In [2]:
spark = SparkSession.builder \
.appName("HELK Reader") \
.master("spark://helk-spark-master:7077") \
.enableHiveSupport() \
.getOrCreate()
In [3]:
es_reader = (spark.read
.format("org.elasticsearch.spark.sql")
.option("inferSchema", "true")
.option("es.read.field.as.array.include", "tags")
.option("es.nodes","helk-elasticsearch:9200")
.option("es.net.http.auth.user","elastic")
)
#PLEASE REMEMBER!!!!
#If you are using elastic TRIAL license, then you need the es.net.http.auth.pass config option set
#Example: .option("es.net.http.auth.pass","elasticpassword")
In [4]:
%%time
sysmon_df = es_reader.load("logs-endpoint-winevent-sysmon-*/")
In [5]:
sysmon_df.createOrReplaceTempView("sysmon_events")
In [6]:
## Run SQL Queries
In [ ]:
sysmon_ps_execution = spark.sql(
'''
SELECT event_id,process_parent_name,process_name
FROM sysmon_events
WHERE event_id = 1
AND process_name = "powershell.exe"
AND NOT process_parent_name = "explorer.exe"
'''
)
sysmon_ps_execution.show(10)
In [ ]:
sysmon_ps_module = spark.sql(
'''
SELECT event_id,process_name
FROM sysmon_events
WHERE event_id = 7
AND (
lower(file_description) = "system.management.automation"
OR lower(module_loaded) LIKE "%\\\\system.management.automation%"
)
'''
)
sysmon_ps_module.show(10)
In [ ]:
sysmon_ps_pipe = spark.sql(
'''
SELECT event_id,process_name
FROM sysmon_events
WHERE event_id = 17
AND lower(pipe_name) LIKE "\\\\pshost%"
'''
)
sysmon_ps_pipe.show(10)
In [ ]:
%%time
powershell_df = es_reader.load("logs-endpoint-winevent-powershell-*/")
In [ ]:
powershell_df.createOrReplaceTempView("powershell_events")
In [ ]:
ps_named_pipe = spark.sql(
'''
SELECT event_id
FROM powershell_events
WHERE event_id = 53504
'''
)
ps_named_pipe.show(10)
In [ ]: