There was a time when a researcher could gather all available data in their field of knowledge in a small library at home and produce results using a pen and a sheet of paper. With personal computers and laptops we have been able to extend our storage and processing power farther than we ever expected, but they cannot cope with it anymore.
Nowadays, scientific experiments generate such amounts of data that they don't fit in a personal computer, not even in a data center such as PIC. This huge need of computing and storage resources is one of the factors that drive the scientific collaborations worldwide. Also, this dramatic increase in capacity and performance that is needed for current experiments requires specific architectures to store and process all this data.
Big Data platforms are a combination of hardware and software designed to handle massive amounts of data. The most popular one is Hadoop. Hadoop is based on the design originally published by Google in several papers comprising, among others, of a:
The Hadoop Distributed File System (HDFS) is the basis of the Hadoop platform, and it is built to work on top of commodity computer clusters. In this architecture, dozens up to thousands of cheap computers work in a coordinate manner to store and process the data. Due to the large number of elements involved (computer components, network, power, etc.) the platform was designed from the ground up to be failure tolerant. Should any element fail at any time, the system would detect the condition and recover from it transparently, and the user will not ever notice.
HDFS works by splitting the files in 128 MiB blocks and replicating them on the cluster nodes in such a way that if a node fails, data is still accessible from any other replica.
MapReduce is programming model used for generating and processing big data sets with parallel and distributed algorithms. Inspired by the map and reduce functions commonly used in functional programming, its key contribution is the scalability and fault-tolerance achieved by optimizing the execution engine.
In MapReduce, data operations are defined with respect to data structured in (key, value) pairs:
Map
takes one pair of data in one data domain and returns a list of pairs in a different domain:
Map(k1,v1) → list(k2,v2)
The Map function is applied in parallel to every pair (keyed by k1) in the input dataset. This produces a list of pairs (keyed by k2) for each call. After that, the MapReduce framework collects all pairs with the same key (k2) from all lists and groups them together, creating one group for each key.Reduce
is then applied in parallel to each group, which in turn produces a collection of values in the same domain:
Reduce(k2, list (v2)) → list(v3)
Each Reduce call typically produces either one value v3 or an empty return, though one call is allowed to return more than one value. The returns of all calls are collected as the desired result list.Is an open-source cluster-computing framework that can run on top of Apache Hadoop. Built on top of MapReduce, if offers an improved interface for non-linear algorithms and operations. Apache Spark is based on a specialized data structure called the resilient distributed dataset (RDD). The use of RDDs facilitates the implementation of iterative algorithms and interactive/exploratory analysis. The latency of Spark applications, compared to a pure MapReduce implementation, may be reduced by several orders of magnitude.
Apache Spark comprises several modules which implement additional processing abilities to the RDDs such as:
For this course, we will introduce the mechanics of working with large datasets using Spark. Ideally, each one of you would have a entire Hadoop cluster to work with but, we are not CERN... so we make use of the ability of Spark to run locally, without a cluster. Later, you could run the same code on top of a Hadoop cluster without changing anything.
In [ ]:
import pyspark
sc = pyspark.SparkContext('local[*]')
In [ ]:
# We define our input
l = range(10)
l
In [ ]:
# We "upload" it as an RDD
rdd = sc.parallelize(l)
rdd
In [ ]:
# We define a map function
def power_of_2(k):
return 2**k
# And we apply it to our RDD
rdd.map(power_of_2)
In [ ]:
# So we use collect() to retrieve all results.
rdd.map(power_of_2).collect()
### WARNING ###
# Never do that in real cases, or you will transfer ALL data to your browser, effectibly killing it.
In [ ]:
# What about summing, everything?
# We define a reduce function
def sum_everything(k1, k2):
return k1 + k2
# And we apply the reduce operation
rdd.reduce(sum_everything)
In [ ]:
# Or we can use the built in operation `sum`
rdd.sum()
In [ ]:
# What if I wanted to compute the sum of the powers of 2?
rdd.map(power_of_2).reduce(sum_everything)
# or
rdd.map(power_of_2).sum()
In [ ]:
# How can we count the number of elements in the array?
rdd.count()
Ok, too easy, this is supposed to be a map & reduce tutorial...
How can we do it WITHOUT the count() action, just using map & reduce.
SPOILER, you could add 1 for each element in the RDD:
sum_everything
reduce function
In [ ]:
def set_to_1(k):
return 1
rdd.map(set_to_1).reduce(sum_everything)
In [ ]:
# Load all Shakespeare works
import os
shakespeare = sc.textFile(os.path.normpath('file:///../../resources/shakespeare.txt'))
In [ ]:
# Show the first lines
shakespeare.take(10)
In [ ]:
# Get the longest line
def keep_longest(k1, k2):
if len(k1) > len(k2):
return k1
else:
return k2
shakespeare.reduce(keep_longest)
In [ ]:
# Compute the average line length
def line_length(k):
return len(k)
shakespeare.map(line_length).sum() / shakespeare.count()
In [ ]:
# Split the text in words
def split_in_words(k):
return k.split()
shakespeare.map(split_in_words).take(2)
In [ ]:
shakespeare.flatMap(split_in_words).take(15)
In [ ]:
shakespeare.flatMap(
lambda k: k.split() # Split in words
).take(15)
In [ ]:
# Retrieve 10 words longer than 15 characters
shakespeare.flatMap(
lambda k: k.split() # Split in words
).filter(
lambda k: len(k)>15 # Keep words longer than 15 characters
).take(10)
In [ ]:
%load -r 1-9 solutions/13_01_Big_Data.py
In [ ]:
%load -r 10-19 solutions/13_01_Big_Data.py
In [ ]:
%load -r 20-29 solutions/13_01_Big_Data.py
In [ ]:
%load -r 30-39 solutions/13_01_Big_Data.py
Which, as you all know, means "the state of being able to achieve honours".
In [ ]:
words = shakespeare.flatMap(
lambda k: k.split() # Split in words
).filter(
lambda k: not (set('.,-') & set(k)) # Drop words with special characters
)
In [ ]:
words.groupBy(lambda k: k).take(10)
In [ ]:
# That method returns an iterable for each different word. This iterable contains a list of all the appearances of the word.
# Lets print its contents
tuples = words.groupBy(lambda k: k).take(5)
for t in tuples:
print(t[0], list(t[1]))
In [ ]:
# Now, to compute the number of appearances, we just have to count the elements in the iterator
words.groupBy(
lambda k: k
).map(
lambda t: (t[0], len(list(t[1])))
).take(5)
In [ ]:
# But this is VERY EXPENSIVE in terms of memory,
# as all the word instances must be stored in a list before they can be counted.
# We can do it much better!
In [ ]:
words.map(
lambda w: (w, 1)
).take(10)
In [ ]:
words.map(
lambda w: (w, 1)
).reduceByKey(
lambda k1, k2: k1 + k2
).take(10)
In [ ]:
%load -r 40-49 solutions/13_01_Big_Data.py
In [ ]:
%load -r 50-69 solutions/13_01_Big_Data.py
In [ ]:
%load -r 70-79 solutions/13_01_Big_Data.py
In [ ]:
from pyspark.sql import SQLContext
sqlc = SQLContext(sc)
In [ ]:
gaia = sqlc.read.csv('../resources/gaia.csv.bz2', comment='#', header=True, inferSchema=True)
gaia
In [ ]:
gaia.count()
In [ ]:
gaia.head(5)
In [ ]:
%matplotlib inline
import pyspark.sql.functions as func
In [ ]:
g_hist = gaia.groupBy(
(
func.floor(gaia.mag_g * 10) / 10
).alias('mag_g'),
).count().orderBy(
'mag_g'
)
g_hist.take(10)
In [ ]:
g_hist.toPandas().set_index('mag_g').plot(loglog=True)
In [ ]:
%load -r 90-99 solutions/13_01_Big_Data.py
In [ ]:
sqlc.registerDataFrameAsTable(gaia, "gaia")
g_hist = sqlc.sql("""
SELECT CAST(FLOOR(mag_g*10)/10. AS FLOAT) AS mag_g, COUNT(*) AS `count`
FROM gaia
GROUP BY 1
ORDER BY 1
""")
g_hist.take(10)
In [ ]:
g_hist.toPandas().set_index('mag_g').plot(loglog=True)
In [ ]:
%load -r 100-109 solutions/13_01_Big_Data.py