Week 3 Exercise Questions

Today's Session Outline:-

  1. Lecture 3 Examples
    • Review
    • Shuffling
    • Partitions
    • Closures
    • Cache and Persist
    • Broadcast Variables
    • Accumulators
  2. Spark SQL
    • Review
    • Data Sources
  3. Exercise 1 discussion

Next Week session Outline:-

  • TBA
  • Cluster environment configuration
  • SparkConfig using external files
  • Exercise 1 Solutions
  • Exercise 2 discussion

Things to remember:- TODO

  • Learn your hardware configuration like RAM, CPU cores, etc.
  • All Spark functions will be given along with the questions, you have to fill Spark function with their respective parameters
    and write the corresponding Scala or Python Logic
  • This is a practice session, so no scores are calculated
  • For quicker programming, we will use the shell environment today
  • If your IDE configurations aren't working, approach us after the exercise session
  • Ofcourse, Solutions will be provided for these questions after this exercise session
  • If you are already familiar with the contents listed above, go ahead in learning Spark SQL

1. Lecture 3 Examples

Review

  • RDD transofrmations are lazy.
  • Persist is storing data in different levels.
  • Cache(actually a specialized Persist) stores it only in memory.
  • Narrow transformations - map, filter When output RDD is created from a single RDD
  • Wide transformations - groupByKey, reduceByKey When output RDD is created from multiple RDDs, causing Shuffle
  • Partition - Refer Last Week's Exercises session notes for examples
  • Closure -
  • Shared variables -
    • Broadcast variables -
    • Accumulators -

Sources:

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-caching.html

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-transformations.html

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-shuffle.html

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-dagscheduler.html

1.1 Shuffle

  • By default, Spark manages the data handling in the behind using Map & Reduce tasks.
  • But during shuffle, we lose precious disk and network resources.

RDD Shuffle

Have you noticed your console during heavy data processing?. You might have seen something like this, <br />
[Stage 2:=======>                                             (143 + 20) / 1000]<br />
So what's a stage?. Wide dependncies are staged, while some narrow dependencies are grouped into a stage.
You can find the stages for your tasks executed in a Spark session at, <br\>
http://localhost:4040/stages/
For example,

Question 1 - Filtering random numbers

  • create an RDD with a group of random numbers
  • remove the negative values
  • Now filter it using a range condition like using > and <
  • did you notice any cnages in Web UI before performing actions
  • count each filtered RDD
  • are there any shuffling noticed in Web UI

Question 2 - Word count

  • create a function for Word count on README.md in your Spark home folder
  • use a reduceByKey function

Question 3

  • In above two examples, if you notice the stages in your Spark Web UI, they are different in execution.
  • The function numbers_count finishes in a single stage.
  • While the function word_count uses two stages.
  • Can you associate the types of depoendencies here?.
  • During stages, some actions like 'reduceByKey' requires data to be moved across partition. Refer Lecture 3 Partition slide Page 10.
  • The stages follow Topological order. Check the DAG overview in Spark Web UI.

Question 4 - Compare groupByKey and reduceBykey

1.2 Partition

  • Why data is partitioned?
    • Allowing workers to have efficient data on which they execute
    • We can define the number of partitions to exists in parallelize or textFile methods
    • no of Partitions is limited to the total number of cores on all executor nodes
  • Used RDD type : pairRDD
  • Types of Paritioning
    • Hash partition - not balanced, due to chaining
    • Ranged parition - balanced, when keys are in different range
      Hint :- Picking a key from the data is difficult. But usually all database table have primary keys, you can use them during range partitioning.
  • Explicit Partitioning
    • partitionBy
      • used on an RDD
      • takes an partitioner object as an argument
      • NEEDS TO BE PERSISTED IN THE END
    • using transformation
      • we create a partionable RDD using transformations sometimes. For example, pairRDD
      • Syntax of a pairRDD : RDD[(K, V)]
      • Some default settings are used by Spark to use either RangePartitioner(?) or HashPartitioner when required
      • Refer Slide no 31 in shuffle_partition.pdf for more detail
      • The partitioner used with the transformation is removed at the result
  • Examples
    • Refer the DAG in Spark Web UI for word Count to learn the internal of transformation
    • partitionBy is shown below
    • RangePartitioner is yet to be implemented in Python (NEEDS VERIFICATION)

Question 5

  • Use partitionBy on Question 4
  • Analyse the difference when using partitions - Explained in Last Exercise Session in Detail

1.3 Closures

  • Anything that survives after it's required duration, or becomes closed in it's access
  • They some time cause errors during Serialization
  • Examples with next Tuesday session in a cluster setup

References :-

http://stackoverflow.com/questions/36636/what-is-a-closure

https://en.wikipedia.org/wiki/Closure_(computer_programming)

A simple C++ example for Closure,

http://mikehadlow.blogspot.fi/2011/07/what-is-closure.html

1.4 Cache & Persist

  • cache is a specialized version of persist
  • cache store the values only to the memory
  • If an RDD size exceeds memory, they will be computed on the fly
  • persist can store an RDD to disk, memory, or both
  • Detailed settings can be found here

http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence

Examples:-

  • try out with larger datasets, so storage preferences will become clear.
  • In the following example, we use generated data. So the data is small.

Question 6

  • create a random set of numbers
  • store it using cache and persist
  • Use Web UI for further analysis

Hint:

  • unpersist before either cache or persist
  • If you face memory error, try to increase driver memory.
  • Set the following when initializing the SparkContext
  • park.driver.memory 4000m

Refer here:-

http://stackoverflow.com/questions/21138751/spark-java-lang-outofmemoryerror-java-heap-space

1.5 Broadcast Variables

  • Spark stores Read-Only variables in each Machine, thus reducing the need for sending the variable with the tasks
  • Data is stored as serialized and de-serialized when it needs to be read.
  • Never change the variable after being broadcasted
  • Maintaining consistency falls upon the user

Question 7

  • declare a broadcast variable and access it using value function

1.6 Accumulators

  • Variable that depend upon multiple operations, for example: a counter variable
  • Accumulators are stored in parallel to other short-lived variables.

Question 8

  • declare an accumulator variable and access it using the value function

Additional Material and Sources for Section 1:-

2. Apache SQL

Review

  • SparkSession
  • DataFrames
  • GlobalTemporaryView
  • Using SQL Queries

2.1 DataFrames

  • new component from Apache Spark 1.6 and above
  • acts as a Distributed SQL Query Engine
  • Organized based upon Column names

2.2 DataSets

2.3 Handled Input Types

  • json spark_session.read.json
  • text spark_session.read.text
  • Apache Parquet files
  • Apache Avro files
  • Hadoop files
  • Cassandra database.

Question 9 - Review

  • Create a Spark Session
  • read the people.json from resources folder and store it into a dataframe
  • use show and printSchema function in the dataframe created from people.json
  • use select and groupBy on the dataframe

Question 10 - Creating a Data Frame from an RDD

  • Code DEMO

3 Exercise 2 Discussion