Install ES-Hadoop connector

  1. Download ES-Hadoop for Spark 2.0+ We are using 5.2.0.
  2. Add either elasticsearch-hadoop-5.2.0.jar or elasticsearch-spark-20_2.10-5.2.0.jar to Spark classpath:
    • Edit the conf/spark-env.sh file to add them to classpath. For instance:
      • SPARK_CLASSPATH={SPARK_HOME}/jars/elasticsearch-hadoop/elasticsearch-hadoop-5.2.0.jar
  3. Using PySpark with ES-Hadoop

We are using Spark 2.1.0. If using Spark 1.6 or below, download the appropriate hadoop connector (2.4).


In [1]:
import os
import sys

sys.path.append(os.environ["SPARK_HOME"] + "/python/lib/py4j-0.10.4-src.zip")
sys.path.append(os.environ["SPARK_HOME"] + "/python/lib/pyspark.zip")

In [2]:
from pyspark import SparkConf, SparkContext

In [3]:
sconf = SparkConf()
sconf.setAppName("ES-Spark Integration")
sconf.setMaster("spark://snehasish-barmans-macbook.local:7077")

sc = SparkContext.getOrCreate(conf = sconf)

In [4]:
print sc
print sc.version


<pyspark.context.SparkContext object at 0x103f14dd0>
2.1.0

Goal: Identify 3-sigma deviation connections

Construct ES Query

Query DSL (Here, we select connections between machines that operate within a restricted environment)


In [5]:
def getESQuery():
    query = """{
        "query": {
                "filtered": {
                     "query": {
                         "match_all": {}
                      },
                      "filter": {
                          "bool": {
                             "must": [
                                 {
                                    "range": {
                                        "@timestamp" : {
                                           "gte" : "now-1h",
                                           "lt" : "now"
                                        }
                                    }
                                 }
                              ],
                              "should": [
                                 { "term": { "src_addr_space": "private" } },
                                 { "term": { "dst_addr_space": "private" } }
                              ]
                          }
                    }
                }
           },
        "_source": [ "netflow.ipv4_src_addr", "netflow.ipv4_dst_addr" ]
        }"""
    return query

Configure ES parameters

  • Mandatory: ES-host, ES-port, ES-index, ES-query
  • If operating in a cloud/restricted environment and want to disable node discovery, set es.nodes.discovery and es.nodes.wan.only as shown below.
  • More Info here

In [6]:
ES_HOST = "192.168.19.156"
ES_PORT = "9200"
ES_INDEX = "seceon_flows"
ES_QUERY = getESQuery()

es_config = {"es.nodes": ES_HOST,
             "es.port": ES_PORT,
             "es.resource": ES_INDEX,
             "es.query": ES_QUERY,
             "es.nodes.discovery": "false",
             "es.nodes.wan.only": "true"
            }

In [7]:
esrdd = sc.newAPIHadoopRDD(inputFormatClass = "org.elasticsearch.hadoop.mr.EsInputFormat",
                           keyClass = "org.apache.hadoop.io.NullWritable",
                           valueClass = "org.elasticsearch.hadoop.mr.LinkedMapWritable",
                           conf = es_config)

ES returns key-value RDD where key is ID of the document, and value is content of _source field : [("_id", {}), ...]


In [8]:
print esrdd.getNumPartitions()
esrdd = esrdd.coalesce(4)
print esrdd.getNumPartitions()

esrdd.persist()
print esrdd.is_cached
print esrdd.getStorageLevel()


169
4
True
Memory Serialized 1x Replicated

In [11]:
rdd1 = esrdd.sample(False, 0.01).mapValues( lambda x: {"src": x["netflow"]["ipv4_src_addr"], 
                                                                    "dst": x["netflow"]["ipv4_dst_addr"]} )

In [13]:
rdd1.take(4)


Out[13]:
[(u'AVoBSHYNfs3wJOjVaSze', {'dst': u'20.52.101.2', 'src': u'10.52.1.2'}),
 (u'AVoBSHYyfs3wJOjVaSzy', {'dst': u'20.52.101.3', 'src': u'10.52.1.3'}),
 (u'AVoBSMOtfs3wJOjVaS_8', {'dst': u'20.52.101.4', 'src': u'10.52.1.4'}),
 (u'AVoBSMPtfs3wJOjVaTBv', {'dst': u'20.52.101.6', 'src': u'10.52.1.6'})]

Every JSON document represents a connection between a source IP and destination IP.


In [14]:
rdd1.countApprox(1000, 0.1)


Out[14]:
1430

List all the unique ip addresses


In [15]:
print rdd1.flatMap( lambda x: [ x[1]["src"], x[1]["dst"] ] ).distinct().count()
print rdd1.flatMap( lambda x: [ x[1]["src"], x[1]["dst"] ] ).distinct().collect()


26
[u'10.52.1.6', u'20.52.101.2', u'20.52.101.6', u'10.54.0.182', u'10.52.1.2', u'10.54.0.172', u'10.52.1.7', u'20.54.0.162', u'20.52.101.3', u'20.52.101.7', u'10.52.1.3', u'20.54.0.172', u'10.52.1.4', u'20.52.101.8', u'10.54.0.162', u'10.52.1.8', u'10.52.1.0', u'20.52.101.0', u'20.52.101.4', u'20.52.101.5', u'20.54.0.182', u'10.52.1.5', u'10.52.1.1', u'20.52.101.1', u'20.52.101.9', u'10.52.1.9']

Create two columns

  • rdd_incoming: RDD of counts of incoming connections (received) for every IP
  • rdd_outgoing: RDD of counts of outgoing connections (sent) for every IP

In [16]:
rdd_outgoing = rdd1.map( lambda x: (x[1]["src"], 1) ).reduceByKey( lambda acc, x: acc + x )

In [17]:
rdd_outgoing.collect()


Out[17]:
[(u'10.52.1.6', 7),
 (u'10.52.1.2', 9),
 (u'10.52.1.7', 9),
 (u'10.52.1.3', 13),
 (u'20.54.0.162', 1253),
 (u'10.52.1.4', 13),
 (u'20.54.0.172', 73),
 (u'10.52.1.8', 12),
 (u'10.52.1.0', 7),
 (u'10.52.1.5', 7),
 (u'20.54.0.182', 6),
 (u'10.52.1.9', 13),
 (u'10.52.1.1', 8)]

In [18]:
rdd_incoming = rdd1.map( lambda x: (x[1]["dst"], 1) ).reduceByKey( lambda acc, x: acc + x )
rdd_incoming.collect()


Out[18]:
[(u'20.52.101.2', 9),
 (u'10.54.0.182', 6),
 (u'20.52.101.6', 7),
 (u'20.52.101.3', 13),
 (u'20.52.101.7', 9),
 (u'10.54.0.172', 73),
 (u'20.52.101.8', 12),
 (u'10.54.0.162', 1253),
 (u'20.52.101.0', 7),
 (u'20.52.101.4', 13),
 (u'20.52.101.9', 13),
 (u'20.52.101.1', 8),
 (u'20.52.101.5', 7)]

Join the two RDD's to create a dataframe like structure


In [37]:
rdd_joined = rdd_outgoing.fullOuterJoin(rdd_incoming)
rdd_joined.collect()


Out[37]:
[(u'10.52.1.6', (7, None)),
 (u'20.52.101.2', (None, 9)),
 (u'10.52.1.2', (9, None)),
 (u'10.54.0.182', (None, 6)),
 (u'20.52.101.6', (None, 7)),
 (u'10.52.1.7', (9, None)),
 (u'10.54.0.172', (None, 73)),
 (u'20.54.0.162', (1253, None)),
 (u'20.52.101.3', (None, 13)),
 (u'20.52.101.7', (None, 9)),
 (u'10.52.1.3', (13, None)),
 (u'20.54.0.172', (73, None)),
 (u'20.52.101.8', (None, 12)),
 (u'10.54.0.162', (None, 1253)),
 (u'10.52.1.4', (13, None)),
 (u'10.52.1.8', (12, None)),
 (u'10.52.1.0', (7, None)),
 (u'20.52.101.0', (None, 7)),
 (u'20.52.101.4', (None, 13)),
 (u'20.54.0.182', (6, None)),
 (u'10.52.1.5', (7, None)),
 (u'20.52.101.9', (None, 13)),
 (u'10.52.1.9', (13, None)),
 (u'10.52.1.1', (8, None)),
 (u'20.52.101.1', (None, 8)),
 (u'20.52.101.5', (None, 7))]

Replace missing values with zeroes


In [38]:
rdd_joined = rdd_joined.mapValues(lambda x: tuple([y if y is not None else 0 for y in x]))
rdd_joined.collect()


Out[38]:
[(u'10.52.1.6', (7, 0)),
 (u'20.52.101.2', (0, 9)),
 (u'10.52.1.2', (9, 0)),
 (u'10.54.0.182', (0, 6)),
 (u'20.52.101.6', (0, 7)),
 (u'10.52.1.7', (9, 0)),
 (u'10.54.0.172', (0, 73)),
 (u'20.54.0.162', (1253, 0)),
 (u'20.52.101.3', (0, 13)),
 (u'20.52.101.7', (0, 9)),
 (u'10.52.1.3', (13, 0)),
 (u'20.54.0.172', (73, 0)),
 (u'20.52.101.8', (0, 12)),
 (u'10.54.0.162', (0, 1253)),
 (u'10.52.1.4', (13, 0)),
 (u'10.52.1.8', (12, 0)),
 (u'10.52.1.0', (7, 0)),
 (u'20.52.101.0', (0, 7)),
 (u'20.52.101.4', (0, 13)),
 (u'20.54.0.182', (6, 0)),
 (u'10.52.1.5', (7, 0)),
 (u'20.52.101.9', (0, 13)),
 (u'10.52.1.9', (13, 0)),
 (u'10.52.1.1', (8, 0)),
 (u'20.52.101.1', (0, 8)),
 (u'20.52.101.5', (0, 7))]

Gather statistics on our dataset


In [54]:
src_stats = rdd_joined.map(lambda x: x[1][0]).stats() # outgoing(src)
print src_stats


(count: 26, mean: 55.0, stdev: 240.011858681, max: 1253.0, min: 0.0)

In [55]:
dst_stats = rdd_joined.map(lambda x: x[1][1]).stats() # incoming(dst)
print dst_stats


(count: 26, mean: 55.0, stdev: 240.011858681, max: 1253.0, min: 0.0)

In [57]:
src_stats_bc = sc.broadcast(src_stats)
dst_stats_bc = sc.broadcast(dst_stats)

In [58]:
src_stats_bc.value


Out[58]:
(count: 26, mean: 55.0, stdev: 240.011858681, max: 1253.0, min: 0.0)

Identify IPs that have unusual number of outgoing or incoming connections


In [64]:
rdd_joined.filter(lambda x: x[1][0] > 0).filter( lambda x: (x[1][0] - src_stats_bc.value.mean()) > (3*src_stats_bc.value.stdev()) ).collect()


Out[64]:
[(u'20.54.0.162', (1253, 0))]

In [65]:
rdd_joined.filter(lambda x: x[1][1] > 0).filter( lambda x: (x[1][1] - dst_stats_bc.value.mean()) > (3*dst_stats_bc.value.stdev()) ).collect()


Out[65]:
[(u'10.54.0.162', (0, 1253))]

Clean up


In [66]:
src_stats_bc.unpersist()
dst_stats_bc.unpersist()
esrdd.unpersist()
print esrdd.is_cached


False

In [67]:
sc.stop()