In [1]:
from pyspark.sql import SparkSession
config(key=None, value=None, conf=None)
In [2]:
spark = SparkSession.builder \
.appName("HELK Reader") \
.master("spark://helk-spark-master:7077") \
.enableHiveSupport() \
.getOrCreate()
In [3]:
spark
Out[3]:
Reference: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html
In [4]:
es_reader = (spark.read
.format("org.elasticsearch.spark.sql")
.option("inferSchema", "true")
.option("es.read.field.as.array.include", "tags")
.option("es.nodes","helk-elasticsearch:9200")
.option("es.net.http.auth.user","elastic")
)
#PLEASE REMEMBER!!!!
#If you are using elastic TRIAL license, then you need the es.net.http.auth.pass config option set
#Example: .option("es.net.http.auth.pass","elasticpassword")
load(path=None, format=None, schema=None, options)**
DataFrame
.
In [5]:
%%time
sysmon_df = es_reader.load("logs-endpoint-winevent-sysmon-*/")
Filter our the data to only show certain data fields and events with the action "processcreate" which is Sysmon Event ID 1
In [6]:
processcreate_df = sysmon_df.filter(sysmon_df.action == "processcreate")
You can select a few columns from your dataframe with the select method.
In [7]:
processcreate_df = processcreate_df.select("process_guid","process_parent_name","process_parent_command_line","process_name","process_command_line","action","@timestamp")
In [8]:
%%time
processcreate_df.show(10)
We are going to use the network events logged by Sysmon (Event ID 3)
In [9]:
networkconnect_df = sysmon_df.filter(sysmon_df.action == "networkconnect")
In [10]:
networkconnect_df = networkconnect_df.select("process_guid","dst_ip_addr","dst_port","dst_host_name","action","@timestamp")
In [11]:
networkconnect_df.show(10,truncate=False)
In [12]:
filecreate_df = sysmon_df.filter(sysmon_df.action == "filecreate")
In [13]:
filecreate_df = filecreate_df.select("process_guid","file_name","action","@timestamp")
In [14]:
%%time
filecreate_df.show(10,truncate=False)
join(other, on=None, how=None)
Joins with another DataFrame, using the given join expression.
Parameters:
In [15]:
process_network_df = processcreate_df.join(networkconnect_df, "process_guid", how="inner")
In [16]:
%%time
process_network_df.select("process_parent_name","process_name","dst_ip_addr").show(truncate=False)
In [17]:
%%time
process_network_df.groupBy('process_parent_name').count().sort('count', ascending=False).show()
In [21]:
%%time
(process_network_df
.filter(process_network_df
.process_parent_name=="svchost.exe")
.select("process_parent_command_line","process_name","dst_ip_addr")
.show(5,truncate=False)
)
Let's focus now on the least frequent events
In [22]:
process_file_df = processcreate_df.join(filecreate_df, "process_guid", how="inner")
In [23]:
%%time
process_file_df.groupBy('process_parent_name').count().sort('count').show()
In [ ]: