In [1]:
import findspark
findspark.init()

from pyspark import SparkContext
sc = SparkContext('local', 'PySpark - SQL')

In [2]:
# !wget https://archive.ics.uci.edu/ml/machine-learning-databases/kddcup99-mld/kddcup.data.gz

In [3]:
raw_data = sc.textFile("kddcup.data.gz")

In [4]:
from pyspark.sql import Row, SQLContext
sql_context = SQLContext(sc)
csv = raw_data.map(lambda l: l.split(","))

In [5]:
rows = csv.map(lambda p: Row(duration=int(p[0]), protocol=p[1], service=p[2]))

In [6]:
# Temporary Tables
df = sql_context.createDataFrame(rows)
df.registerTempTable("rdd")

In [8]:
sql_context.sql("""SELECT duration FROM rdd WHERE protocol = 'tcp' AND duration > 2000""").show()


+--------+
|duration|
+--------+
|   10217|
|   11610|
|   13724|
|   10934|
|   12026|
|    5506|
|   12381|
|    9733|
|   17932|
|   40504|
|   11565|
|   12454|
|    9473|
|   12865|
|   11288|
|   10501|
|   14479|
|   10774|
|   10007|
|   12828|
+--------+
only showing top 20 rows


In [9]:
df.select("duration").filter(df.duration>2000).filter(df.protocol=="tcp").show()


+--------+
|duration|
+--------+
|   10217|
|   11610|
|   13724|
|   10934|
|   12026|
|    5506|
|   12381|
|    9733|
|   17932|
|   40504|
|   11565|
|   12454|
|    9473|
|   12865|
|   11288|
|   10501|
|   14479|
|   10774|
|   10007|
|   12828|
+--------+
only showing top 20 rows


In [10]:
df.describe()


Out[10]:
DataFrame[summary: string, duration: string, protocol: string, service: string]

Testing - Transformations (Scala)

//TESTING:  Setup in Scala
class DeferComputations extends FunSuite {
    val spark: SparkContext = SparkSession
                                .builder()
                                .master("local[2]")
                                .getOrCreate().sparkContext


//ACTUAL TEST
test("should defer computations") {
 //given
    val input = spark.makeRDD(
        List(InputRecord(userId = "A"),
            InputRecord(userId = "B"))) 

case class InputRecord(userId: String)
//when apply transformation
val rdd = input
    .filter(_.userId.contains("A"))
    .keyBy(_.userId)
.map(_._2.userId.toLowerCase)
//.... built processing graph lazy

private def shouldExecutePartOfCode(): Boolean = {
    //some domain logic that decides if we still need to calculate
    true
}

if (shouldExecutePartOfCode()) {
     //rdd.saveAsTextFile("") ||
     rdd.collect().toList
  } else {
    //condition changed - don't need to evaluate DAG
 }
}
case class UserTransaction(userId: String, amount: Int)

test("should trigger computations using actions") {
 //given
 val input = spark.makeRDD(
     List(
         UserTransaction(userId = "A", amount = 1001),
         UserTransaction(userId = "A", amount = 100),
         UserTransaction(userId = "A", amount = 102),
         UserTransaction(userId = "A", amount = 1),
         UserTransaction(userId = "B", amount = 13)))


// To get transactions for a specific userId 
//when apply transformation
val rdd = input
    .groupBy(_.userId)
    .map(x => (x._1,x._2.toList))
    .collect()
    .toList

//then
rdd should contain theSameElementsAs List(
    ("B", List(UserTransaction("B", 13))),
    ("A", List(
        UserTransaction("A", 1001),
        UserTransaction("A", 100),
        UserTransaction("A", 102),
        UserTransaction("A", 1))
    )
  )
 }
}

Using reduce and reduceByKey instead of groupBy

test("should use reduce API") {
    //given
    val input = spark.makeRDD(List(
    UserTransaction("A", 10),
    UserTransaction("B", 1),
    UserTransaction("A", 101)
    ))

     //when
val result = input
    .map(_.amount)
    .reduce((a, b) => if (a > b) a else b)

//then
assert(result == 101)
}


test("should use reduceByKey API") {
    //given
    val input = spark.makeRDD(
    List(
        UserTransaction("A", 10),
        UserTransaction("B", 1),
        UserTransaction("A", 101)
    )
)


    //when
    val result = input
      .keyBy(_.userId)
      .reduceByKey((firstTransaction, secondTransaction) =>
        TransactionChecker.higherTransactionAmount(firstTransaction, secondTransaction))
      .collect()
      .toList


        //then
    result should contain theSameElementsAs
      List(("B", UserTransaction("B", 1)), ("A", UserTransaction("A", 101)))
  }

}



object TransactionChecker {
    def higherTransactionAmount(firstTransaction: UserTransaction,
                                secondTransaction: UserTransaction): UserTransaction = {
        if (firstTransaction.amount > secondTransaction.amount) firstTransaction else  secondTransaction
    }
}

Actions - Computations

test("should trigger computations using actions") {
     //given
     val input = spark.makeRDD(
     List(
         UserTransaction(userId = "A", amount = 1001),
         UserTransaction(userId = "A", amount = 100),
         UserTransaction(userId = "A", amount = 102),
         UserTransaction(userId = "A", amount = 1),
         UserTransaction(userId = "B", amount = 13)))

//when apply transformation
 val rdd = input
     .filter(_.userId.contains("A"))
     .keyBy(_.userId)
     .map(_._2.amount)

//then
 println(rdd.collect().toList)
 println(rdd.count()) //and all count*


// Simple check
 println(rdd.first())


 rdd.foreach(println(_))
 rdd.foreachPartition(t => t.foreach(println(_)))
 println(rdd.max())
 println(rdd.min())

 //takeOrdered() - needs to execute DAT and sort everything [TIME CONSUMING!!!]
 println(rdd.takeOrdered(1).toList)
 println(rdd.takeSample(false, 2).toList)
 }
}

Reuse rdd to minimize time

//then every call to action means that we are going up to the RDD chain
//if we are loading data from external file-system (I.E.: HDFS), every action means
//that we need to load it from FS.
    val start = System.currentTimeMillis()
    println(rdd.collect().toList)
    println(rdd.count())
    println(rdd.first())
    rdd.foreach(println(_))
    rdd.foreachPartition(t => t.foreach(println(_)))
    println(rdd.max())
    println(rdd.min())
    println(rdd.takeOrdered(1).toList)
    println(rdd.takeSample(false, 2).toList)
    val result = System.currentTimeMillis() - start

    println(s"time taken (no-cache): $result")

}


// Using Caching
//when apply transformation
val rdd = input
    .filter(_.userId.contains("A"))
    .keyBy(_.userId)
    .map(_._2.amount)
    .cache()

//Rerun test
//then every call to action means that we are going up to the RDD chain
//if we are loading data from external file-system (I.E.: HDFS), every action means
//that we need to load it from FS.
    val start = System.currentTimeMillis()
    println(rdd.collect().toList)
    println(rdd.count())
    println(rdd.first())
    rdd.foreach(println(_))
    rdd.foreachPartition(t => t.foreach(println(_)))
    println(rdd.max())
    println(rdd.min())
    println(rdd.takeOrdered(1).toList)
    println(rdd.takeSample(false, 2).toList)
    val result = System.currentTimeMillis() - start

    println(s"time taken(cache): $result")


    }
}

In [ ]:
t