Introduction

Big data & Hadoop

There was a time when a researcher could gather all available data in their field of knowledge in a small library at home and produce results using a pen and a sheet of paper. With personal computers and laptops we have been able to extend our storage and processing power farther than we ever expected, but they cannot cope with it anymore.

Nowadays, scientific experiments generate such amounts of data that they don't fit in a personal computer, not even in a data center such as PIC. This huge need of computing and storage resources is one of the factors that drive the scientific collaborations worldwide. Also, this dramatic increase in capacity and performance that is needed for current experiments requires specific architectures to store and process all this data.

Big Data platforms are a combination of hardware and software designed to handle massive amounts of data. The most popular one is Hadoop. Hadoop is based on the design originally published by Google in several papers comprising, among others, of a:

  • distributed file system (HDFS)
  • MapReduce programming model

HDFS

The Hadoop Distributed File System (HDFS) is the basis of the Hadoop platform, and it is built to work on top of commodity computer clusters. In this architecture, dozens up to thousands of cheap computers work in a coordinate manner to store and process the data. Due to the large number of elements involved (computer components, network, power, etc.) the platform was designed from the ground up to be failure tolerant. Should any element fail at any time, the system would detect the condition and recover from it transparently, and the user will not ever notice.

HDFS works by splitting the files in 128 MiB blocks and replicating them on the cluster nodes in such a way that if a node fails, data is still accessible from any other replica.

MapReduce

MapReduce is programming model used for generating and processing big data sets with parallel and distributed algorithms. Inspired by the map and reduce functions commonly used in functional programming, its key contribution is the scalability and fault-tolerance achieved by optimizing the execution engine.

In MapReduce, data operations are defined with respect to data structured in (key, value) pairs:

  • Map takes one pair of data in one data domain and returns a list of pairs in a different domain:
    Map(k1,v1) → list(k2,v2)
    
    The Map function is applied in parallel to every pair (keyed by k1) in the input dataset. This produces a list of pairs (keyed by k2) for each call. After that, the MapReduce framework collects all pairs with the same key (k2) from all lists and groups them together, creating one group for each key.
  • Reduce is then applied in parallel to each group, which in turn produces a collection of values in the same domain:
    Reduce(k2, list (v2)) → list(v3)
    
    Each Reduce call typically produces either one value v3 or an empty return, though one call is allowed to return more than one value. The returns of all calls are collected as the desired result list.

Apache Spark

Is an open-source cluster-computing framework that can run on top of Apache Hadoop. Built on top of MapReduce, if offers an improved interface for non-linear algorithms and operations. Apache Spark is based on a specialized data structure called the resilient distributed dataset (RDD). The use of RDDs facilitates the implementation of iterative algorithms and interactive/exploratory analysis. The latency of Spark applications, compared to a pure MapReduce implementation, may be reduced by several orders of magnitude.

Apache Spark comprises several modules which implement additional processing abilities to the RDDs such as:

  • Spark SQL: structured data like database result sets
  • Spark Streaming: real-time data
  • Spark MLlib: machine learning
  • Spark Graphx: graph processing

For this course, we will introduce the mechanics of working with large datasets using Spark. Ideally, each one of you would have a entire Hadoop cluster to work with but, we are not CERN... so we make use of the ability of Spark to run locally, without a cluster. Later, you could run the same code on top of a Hadoop cluster without changing anything.

Map & Reduce

Note:

Spark operations can be classified as either:

  • ACTIONS: Trigger a computation and return a result
    • reduce, collect, aggregate, groupBy, take, ...
  • TRANSFORMATIONS: return a new RDD with the transformation applied (think of composing functions)
    • map, reduce, filter, join, ...

In [ ]:
import pyspark
sc = pyspark.SparkContext('local[*]')

In [ ]:
# We define our input
l = range(10)
l

In [ ]:
# We "upload" it as an RDD
rdd = sc.parallelize(l)
rdd

map()


In [ ]:
# We define a map function
def power_of_2(k):
    return 2**k

# And we apply it to our RDD
rdd.map(power_of_2)

In [ ]:
# So we use collect() to retrieve all results.
rdd.map(power_of_2).collect()

### WARNING ###
# Never do that in real cases, or you will transfer ALL data to your browser, effectibly killing it.

reduce()


In [ ]:
# What about summing, everything?
# We define a reduce function
def sum_everything(k1, k2):
    return k1 + k2

# And we apply the reduce operation
rdd.reduce(sum_everything)

In [ ]:
# Or we can use the built in operation `sum`
rdd.sum()

pipelining


In [ ]:
# What if I wanted to compute the sum of the powers of 2?
rdd.map(power_of_2).reduce(sum_everything)
# or 
rdd.map(power_of_2).sum()

In [ ]:
# How can we count the number of elements in the array?
rdd.count()

Ok, too easy, this is supposed to be a map & reduce tutorial...

How can we do it WITHOUT the count() action, just using map & reduce.

SPOILER, you could add 1 for each element in the RDD:

  • Build a map function that given an element, it transforms it into a 1.
  • Then apply our sum_everything reduce function

In [ ]:
def set_to_1(k):
    return 1

rdd.map(set_to_1).reduce(sum_everything)

RDD


In [ ]:
# Load all Shakespeare works
import os
shakespeare = sc.textFile(os.path.normpath('file:///../../resources/shakespeare.txt'))

In [ ]:
# Show the first lines
shakespeare.take(10)

In [ ]:
# Get the longest line
def keep_longest(k1, k2):
    if len(k1) > len(k2):
        return k1
    else:
        return k2

shakespeare.reduce(keep_longest)

In [ ]:
# Compute the average line length
def line_length(k):
    return len(k)

shakespeare.map(line_length).sum() / shakespeare.count()

flatMap() vs map()


In [ ]:
# Split the text in words
def split_in_words(k):
    return k.split()

shakespeare.map(split_in_words).take(2)

In [ ]:
shakespeare.flatMap(split_in_words).take(15)

lambda functions


In [ ]:
shakespeare.flatMap(
    lambda k: k.split() # Split in words
).take(15)

filter()


In [ ]:
# Retrieve 10 words longer than 15 characters
shakespeare.flatMap(
    lambda k: k.split() # Split in words
).filter(
    lambda k: len(k)>15 # Keep words longer than 15 characters
).take(10)

Exercise

How many times did use the word 'murder'? (case insensitive)


In [ ]:
%load -r 1-9 solutions/13_01_Big_Data.py

Exercise

Show 10 words longer than 15 characters


In [ ]:
%load -r 10-19 solutions/13_01_Big_Data.py

Exercise

Show all words longer than 15 characters, but dropping those with any of the following characters (. , -)


In [ ]:
%load -r 20-29 solutions/13_01_Big_Data.py

Exercise

Retrieve the longest word (without . , -), reusing the keep_longest reduce function.


In [ ]:
%load -r 30-39 solutions/13_01_Big_Data.py

Which, as you all know, means "the state of being able to achieve honours".

Key-Value RDD

We want to count the number of appearances of every word


In [ ]:
words = shakespeare.flatMap(
    lambda k: k.split()                 # Split in words
).filter(
    lambda k: not (set('.,-') & set(k)) # Drop words with special characters
)

groupBy()


In [ ]:
words.groupBy(lambda k: k).take(10)

In [ ]:
# That method returns an iterable for each different word. This iterable contains a list of all the appearances of the word.
# Lets print its contents

tuples = words.groupBy(lambda k: k).take(5)

for t in tuples:
    print(t[0], list(t[1]))

In [ ]:
# Now, to compute the number of appearances, we just have to count the elements in the iterator

words.groupBy(
    lambda k: k
).map(
    lambda t: (t[0], len(list(t[1])))
).take(5)

In [ ]:
# But this is VERY EXPENSIVE in terms of memory,
# as all the word instances must be stored in a list before they can be counted.

# We can do it much better!

reduceByKey


In [ ]:
words.map(
    lambda w: (w, 1)
).take(10)

In [ ]:
words.map(
    lambda w: (w, 1)
).reduceByKey(
    lambda k1, k2: k1 + k2 
).take(10)

Exercise

Get the 10 most-used words and its number of appearances


In [ ]:
%load -r 40-49 solutions/13_01_Big_Data.py

Exercise

  1. Print then 10 most used words longer than 5 characters (case-insensitive)
  2. How many words, longer than 50 characters, are used more than 500 times? (case-insensitive)

In [ ]:
%load -r 50-69 solutions/13_01_Big_Data.py

In [ ]:
%load -r 70-79 solutions/13_01_Big_Data.py

DataFrame


In [ ]:
from pyspark.sql import SQLContext
sqlc = SQLContext(sc)

In [ ]:
gaia = sqlc.read.csv('../resources/gaia.csv.bz2', comment='#', header=True, inferSchema=True)
gaia

In [ ]:
gaia.count()

In [ ]:
gaia.head(5)

Pandas-like interface


In [ ]:
%matplotlib inline

import pyspark.sql.functions as func

In [ ]:
g_hist = gaia.groupBy(
    (
        func.floor(gaia.mag_g * 10) / 10
    ).alias('mag_g'),
).count().orderBy(
    'mag_g'
)
g_hist.take(10)

In [ ]:
g_hist.toPandas().set_index('mag_g').plot(loglog=True)

Exercise

Plot an 'ra' histogram in 1-degree bins (count how many stars are in each bin).

Can you spot the galaxy center? ;)


In [ ]:
%load -r 90-99 solutions/13_01_Big_Data.py

SQL interface


In [ ]:
sqlc.registerDataFrameAsTable(gaia, "gaia")

g_hist = sqlc.sql("""
    SELECT CAST(FLOOR(mag_g*10)/10. AS FLOAT) AS mag_g, COUNT(*) AS `count`
    FROM gaia
    GROUP BY 1
    ORDER BY 1
""")

g_hist.take(10)

In [ ]:
g_hist.toPandas().set_index('mag_g').plot(loglog=True)

Exercise

Plot an 'ra' histogram in 1-degree bins (count how many stars are in each bin).

Can you spot the galaxy center? ;)


In [ ]:
%load -r 100-109 solutions/13_01_Big_Data.py