In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .appName("HELK JOIN") \
    .master("spark://helk-spark-master:7077") \
    .enableHiveSupport() \
    .getOrCreate()

In [3]:
es_reader = (spark.read
    .format("org.elasticsearch.spark.sql")
    .option("inferSchema", "true")
    .option("es.read.field.as.array.include", "tags")
    .option("es.nodes","helk-elasticsearch:9200")
    .option("es.net.http.auth.user","elastic")
)

In [4]:
security_df = es_reader.load("logs-endpoint-winevent-security-*/")

In [5]:
security_df.createOrReplaceTempView("security_events")

In [6]:
security_4624_3 = spark.sql(
    '''
    SELECT event_id,
        host_name,
        src_ip_addr,
        user_logon_id,
        user_name,
        logon_type,
        `@timestamp`
    FROM security_events
    WHERE event_id = 4624
        AND logon_type = 3
        AND src_ip_addr is not null
        AND `@timestamp` BETWEEN "2019-05-18 00:00:00.000" AND "2019-05-19 00:00:00.000"
    '''
)

In [7]:
security_4624_3.createOrReplaceTempView("security_4624_3")

In [8]:
sysmon_df = es_reader.load("logs-endpoint-winevent-sysmon-*/")

In [9]:
sysmon_df.createOrReplaceTempView("sysmon_events")

In [10]:
sysmon_processcreate = spark.sql(
    '''
    SELECT event_id,
        host_name,
        process_parent_name,
        process_parent_guid,
        process_parent_command_line,
        process_name,
        process_guid,
        process_command_line,
        user_logon_id,
        `@timestamp`
    FROM sysmon_events
    WHERE event_id = 1
        AND `@timestamp` BETWEEN "2019-05-18 00:00:00.000" AND "2019-05-19 00:00:00.000"
    '''
)

In [11]:
sysmon_processcreate.createOrReplaceTempView("sysmon_1")

In [13]:
security_sysmon_join = spark.sql(
    '''
    SELECT s.`@timestamp`,
        s.host_name,
        s.src_ip_addr,
        s.logon_type,
        s.user_logon_id,
        s.user_name,
        p.process_parent_name,
        p.process_parent_guid,
        p.process_parent_command_line,
        p.process_name,
        p.process_guid,
        p.process_command_line
    FROM security_4624_3 s
    INNER JOIN sysmon_1 p
        ON s.user_logon_id = p.user_logon_id
    '''
)

In [14]:
security_sysmon_join.select(
    "@timestamp","src_ip_addr","host_name","user_name","process_parent_name","process_parent_command_line","process_name","process_command_line"
).show()


+--------------------+-------------+---------------+---------+-------------------+---------------------------+--------------+--------------------+
|          @timestamp|  src_ip_addr|      host_name|user_name|process_parent_name|process_parent_command_line|  process_name|process_command_line|
+--------------------+-------------+---------------+---------+-------------------+---------------------------+--------------+--------------------+
|2019-05-18 21:45:...|172.18.39.106|IT001.shire.com| pgustavo|       wmiprvse.exe|       c:\windows\system...|powershell.exe|c:\windows\system...|
|2019-05-18 21:45:...|172.18.39.106|IT001.shire.com| pgustavo|     powershell.exe|       c:\windows\system...|   conhost.exe|\??\c:\windows\sy...|
|2019-05-18 21:45:...|172.18.39.106|IT001.shire.com| pgustavo|     powershell.exe|       c:\windows\system...|    whoami.exe|"c:\windows\syste...|
+--------------------+-------------+---------------+---------+-------------------+---------------------------+--------------+--------------------+


In [ ]: