In [1]:
import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext('local', 'PySpark - SQL')
In [2]:
# !wget https://archive.ics.uci.edu/ml/machine-learning-databases/kddcup99-mld/kddcup.data.gz
In [3]:
raw_data = sc.textFile("kddcup.data.gz")
In [4]:
from pyspark.sql import Row, SQLContext
sql_context = SQLContext(sc)
csv = raw_data.map(lambda l: l.split(","))
In [5]:
rows = csv.map(lambda p: Row(duration=int(p[0]), protocol=p[1], service=p[2]))
In [6]:
# Temporary Tables
df = sql_context.createDataFrame(rows)
df.registerTempTable("rdd")
In [8]:
sql_context.sql("""SELECT duration FROM rdd WHERE protocol = 'tcp' AND duration > 2000""").show()
In [9]:
df.select("duration").filter(df.duration>2000).filter(df.protocol=="tcp").show()
In [10]:
df.describe()
Out[10]:
//TESTING: Setup in Scala
class DeferComputations extends FunSuite {
val spark: SparkContext = SparkSession
.builder()
.master("local[2]")
.getOrCreate().sparkContext
//ACTUAL TEST
test("should defer computations") {
//given
val input = spark.makeRDD(
List(InputRecord(userId = "A"),
InputRecord(userId = "B")))
case class InputRecord(userId: String)
//when apply transformation
val rdd = input
.filter(_.userId.contains("A"))
.keyBy(_.userId)
.map(_._2.userId.toLowerCase)
//.... built processing graph lazy
private def shouldExecutePartOfCode(): Boolean = {
//some domain logic that decides if we still need to calculate
true
}
if (shouldExecutePartOfCode()) {
//rdd.saveAsTextFile("") ||
rdd.collect().toList
} else {
//condition changed - don't need to evaluate DAG
}
}
case class UserTransaction(userId: String, amount: Int)
test("should trigger computations using actions") {
//given
val input = spark.makeRDD(
List(
UserTransaction(userId = "A", amount = 1001),
UserTransaction(userId = "A", amount = 100),
UserTransaction(userId = "A", amount = 102),
UserTransaction(userId = "A", amount = 1),
UserTransaction(userId = "B", amount = 13)))
// To get transactions for a specific userId
//when apply transformation
val rdd = input
.groupBy(_.userId)
.map(x => (x._1,x._2.toList))
.collect()
.toList
//then
rdd should contain theSameElementsAs List(
("B", List(UserTransaction("B", 13))),
("A", List(
UserTransaction("A", 1001),
UserTransaction("A", 100),
UserTransaction("A", 102),
UserTransaction("A", 1))
)
)
}
}
test("should use reduce API") {
//given
val input = spark.makeRDD(List(
UserTransaction("A", 10),
UserTransaction("B", 1),
UserTransaction("A", 101)
))
//when
val result = input
.map(_.amount)
.reduce((a, b) => if (a > b) a else b)
//then
assert(result == 101)
}
test("should use reduceByKey API") {
//given
val input = spark.makeRDD(
List(
UserTransaction("A", 10),
UserTransaction("B", 1),
UserTransaction("A", 101)
)
)
//when
val result = input
.keyBy(_.userId)
.reduceByKey((firstTransaction, secondTransaction) =>
TransactionChecker.higherTransactionAmount(firstTransaction, secondTransaction))
.collect()
.toList
//then
result should contain theSameElementsAs
List(("B", UserTransaction("B", 1)), ("A", UserTransaction("A", 101)))
}
}
object TransactionChecker {
def higherTransactionAmount(firstTransaction: UserTransaction,
secondTransaction: UserTransaction): UserTransaction = {
if (firstTransaction.amount > secondTransaction.amount) firstTransaction else secondTransaction
}
}
test("should trigger computations using actions") {
//given
val input = spark.makeRDD(
List(
UserTransaction(userId = "A", amount = 1001),
UserTransaction(userId = "A", amount = 100),
UserTransaction(userId = "A", amount = 102),
UserTransaction(userId = "A", amount = 1),
UserTransaction(userId = "B", amount = 13)))
//when apply transformation
val rdd = input
.filter(_.userId.contains("A"))
.keyBy(_.userId)
.map(_._2.amount)
//then
println(rdd.collect().toList)
println(rdd.count()) //and all count*
// Simple check
println(rdd.first())
rdd.foreach(println(_))
rdd.foreachPartition(t => t.foreach(println(_)))
println(rdd.max())
println(rdd.min())
//takeOrdered() - needs to execute DAT and sort everything [TIME CONSUMING!!!]
println(rdd.takeOrdered(1).toList)
println(rdd.takeSample(false, 2).toList)
}
}
//then every call to action means that we are going up to the RDD chain
//if we are loading data from external file-system (I.E.: HDFS), every action means
//that we need to load it from FS.
val start = System.currentTimeMillis()
println(rdd.collect().toList)
println(rdd.count())
println(rdd.first())
rdd.foreach(println(_))
rdd.foreachPartition(t => t.foreach(println(_)))
println(rdd.max())
println(rdd.min())
println(rdd.takeOrdered(1).toList)
println(rdd.takeSample(false, 2).toList)
val result = System.currentTimeMillis() - start
println(s"time taken (no-cache): $result")
}
// Using Caching
//when apply transformation
val rdd = input
.filter(_.userId.contains("A"))
.keyBy(_.userId)
.map(_._2.amount)
.cache()
//Rerun test
//then every call to action means that we are going up to the RDD chain
//if we are loading data from external file-system (I.E.: HDFS), every action means
//that we need to load it from FS.
val start = System.currentTimeMillis()
println(rdd.collect().toList)
println(rdd.count())
println(rdd.first())
rdd.foreach(println(_))
rdd.foreachPartition(t => t.foreach(println(_)))
println(rdd.max())
println(rdd.min())
println(rdd.takeOrdered(1).toList)
println(rdd.takeSample(false, 2).toList)
val result = System.currentTimeMillis() - start
println(s"time taken(cache): $result")
}
}
In [ ]:
t