GraphFrames is a package for Apache Spark which provides DataFrame-based Graphs.
In [1]:
from pyspark.sql import SparkSession
config(key=None, value=None, conf=None)
In [2]:
spark = SparkSession.builder \
.appName("HELK Graphs") \
.master("spark://helk-spark-master:7077") \
.enableHiveSupport() \
.getOrCreate()
In [3]:
spark
Out[3]:
In [4]:
from graphframes import *
from pyspark.sql.functions import *
In [5]:
%%time
# Create a Vertex DataFrame with unique ID column "id"# Creat
v = spark.createDataFrame([
("a", "Alice", 34),
("b", "Bob", 36),
("c", "Charlie", 30),
], ["id", "name", "age"])
# Create an Edge DataFrame with "src" and "dst" columns
e = spark.createDataFrame([
("a", "b", "friend"),
("b", "c", "follow"),
("c", "b", "follow"),
], ["src", "dst", "relationship"])
# Create a GraphFrame
from graphframes import *
g = GraphFrame(v, e)
# Query: Get in-degree of each vertex.
g.inDegrees.show()
# Query: Count the number of "follow" connections in the graph.
g.edges.filter("relationship = 'follow'").count()
Reference: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html
In [6]:
es_reader = (spark.read
.format("org.elasticsearch.spark.sql")
.option("inferSchema", "true")
.option("es.read.field.as.array.include", "tags")
.option("es.nodes","helk-elasticsearch:9200")
.option("es.net.http.auth.user","elastic")
)
#PLEASE REMEMBER!!!!
#If you are using elastic TRIAL license, then you need the es.net.http.auth.pass config option set
#Example: .option("es.net.http.auth.pass","elasticpassword")
load(path=None, format=None, schema=None, options)**
DataFrame
.
In [7]:
%%time
sysmon_df = es_reader.load("logs-endpoint-winevent-sysmon-*/")
We are going to replace the column name from process_guid to id because thats the column name that Graphframes uses for the row IDs.
In [8]:
vertices = (sysmon_df.withColumn("id", sysmon_df.process_guid)
.select("id","user_name","host_name","process_parent_name","process_name","action")
)
vertices = vertices.filter(vertices.action == "processcreate")
In [9]:
%%time
vertices.show(5,truncate=False)
We are going to make sure we also rename our process_parent_guid to src and process_guid to dst. This is to look for that relationship across our whole environment
In [10]:
edges = (sysmon_df
.filter(sysmon_df.action == "processcreate")
.selectExpr("process_parent_guid as src","process_guid as dst")
.withColumn("relationship", lit("spawned"))
)
In [11]:
%%time
edges.show(5,truncate=False)
In [12]:
g = GraphFrame(vertices, edges)
In [13]:
%%time
motifs = g.find("(a)-[]->(b);(b)-[]->(c)")
In [14]:
%%time
(motifs
.select("a.process_parent_name","a.process_name","b.process_name","c.process_name")
.show(20,truncate=False)
)
In [15]:
%%time
motifs.groupby('a.process_parent_name').count().sort('count').show(10)