We are using Spark 2.1.0. If using Spark 1.6 or below, download the appropriate hadoop connector (2.4).
In [1]:
import os
import sys
sys.path.append(os.environ["SPARK_HOME"] + "/python/lib/py4j-0.10.4-src.zip")
sys.path.append(os.environ["SPARK_HOME"] + "/python/lib/pyspark.zip")
In [2]:
from pyspark import SparkConf, SparkContext
In [3]:
sconf = SparkConf()
sconf.setAppName("ES-Spark Integration")
sconf.setMaster("spark://snehasish-barmans-macbook.local:7077")
sc = SparkContext.getOrCreate(conf = sconf)
In [4]:
print sc
print sc.version
In [5]:
def getESQuery():
query = """{
"query": {
"filtered": {
"query": {
"match_all": {}
},
"filter": {
"bool": {
"must": [
{
"range": {
"@timestamp" : {
"gte" : "now-1h",
"lt" : "now"
}
}
}
],
"should": [
{ "term": { "src_addr_space": "private" } },
{ "term": { "dst_addr_space": "private" } }
]
}
}
}
},
"_source": [ "netflow.ipv4_src_addr", "netflow.ipv4_dst_addr" ]
}"""
return query
In [6]:
ES_HOST = "192.168.19.156"
ES_PORT = "9200"
ES_INDEX = "seceon_flows"
ES_QUERY = getESQuery()
es_config = {"es.nodes": ES_HOST,
"es.port": ES_PORT,
"es.resource": ES_INDEX,
"es.query": ES_QUERY,
"es.nodes.discovery": "false",
"es.nodes.wan.only": "true"
}
In [7]:
esrdd = sc.newAPIHadoopRDD(inputFormatClass = "org.elasticsearch.hadoop.mr.EsInputFormat",
keyClass = "org.apache.hadoop.io.NullWritable",
valueClass = "org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf = es_config)
ES returns key-value RDD where key is ID of the document, and value is content of _source field : [("_id", {}), ...]
In [8]:
print esrdd.getNumPartitions()
esrdd = esrdd.coalesce(4)
print esrdd.getNumPartitions()
esrdd.persist()
print esrdd.is_cached
print esrdd.getStorageLevel()
In [11]:
rdd1 = esrdd.sample(False, 0.01).mapValues( lambda x: {"src": x["netflow"]["ipv4_src_addr"],
"dst": x["netflow"]["ipv4_dst_addr"]} )
In [13]:
rdd1.take(4)
Out[13]:
Every JSON document represents a connection between a source IP and destination IP.
In [14]:
rdd1.countApprox(1000, 0.1)
Out[14]:
In [15]:
print rdd1.flatMap( lambda x: [ x[1]["src"], x[1]["dst"] ] ).distinct().count()
print rdd1.flatMap( lambda x: [ x[1]["src"], x[1]["dst"] ] ).distinct().collect()
In [16]:
rdd_outgoing = rdd1.map( lambda x: (x[1]["src"], 1) ).reduceByKey( lambda acc, x: acc + x )
In [17]:
rdd_outgoing.collect()
Out[17]:
In [18]:
rdd_incoming = rdd1.map( lambda x: (x[1]["dst"], 1) ).reduceByKey( lambda acc, x: acc + x )
rdd_incoming.collect()
Out[18]:
In [37]:
rdd_joined = rdd_outgoing.fullOuterJoin(rdd_incoming)
rdd_joined.collect()
Out[37]:
In [38]:
rdd_joined = rdd_joined.mapValues(lambda x: tuple([y if y is not None else 0 for y in x]))
rdd_joined.collect()
Out[38]:
In [54]:
src_stats = rdd_joined.map(lambda x: x[1][0]).stats() # outgoing(src)
print src_stats
In [55]:
dst_stats = rdd_joined.map(lambda x: x[1][1]).stats() # incoming(dst)
print dst_stats
In [57]:
src_stats_bc = sc.broadcast(src_stats)
dst_stats_bc = sc.broadcast(dst_stats)
In [58]:
src_stats_bc.value
Out[58]:
In [64]:
rdd_joined.filter(lambda x: x[1][0] > 0).filter( lambda x: (x[1][0] - src_stats_bc.value.mean()) > (3*src_stats_bc.value.stdev()) ).collect()
Out[64]:
In [65]:
rdd_joined.filter(lambda x: x[1][1] > 0).filter( lambda x: (x[1][1] - dst_stats_bc.value.mean()) > (3*dst_stats_bc.value.stdev()) ).collect()
Out[65]:
In [66]:
src_stats_bc.unpersist()
dst_stats_bc.unpersist()
esrdd.unpersist()
print esrdd.is_cached
In [67]:
sc.stop()