In [1]:
# %load ../../preconfig.py
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(color_codes=True)
plt.rcParams['axes.grid'] = False
import numpy as np
#import pandas as pd
#import sklearn
#import itertools
import logging
logger = logging.getLogger()
"big-data" analysis:
manage immense amounts of data quickly.
data is extremely regular $\to$ exploit parallelism.
new software stack:
"distributed file system" $\to$ MapReduce
When designing MapReduce algorithms, we often find that the greatest cost is in the communication.
cluster computing: the new parallel-computing architecture.
racks: compute nodes are stored on racks.
communication:
The nodes on a single rack are connected by a network, typically gigabit Ethernet. And racks are connected by another level of network or a switch.
The bandwidth of inter-rack communication is somewhat greater than the intrarack Ethernet.
In [3]:
plt.imshow(plt.imread('./res/fig2_1.png'))
Out[3]:
the solution for components failure (loss of node or rack):
Files must be stored redundantly.
Computations must be divided into tasks.
DFS: distributed file system
Google File System (GFS)
Hadoop Distributed File System (HDFS)
CloudStore
It is typically used as follows:
Files can be enormous, possibly a terabyte in size.
Files are rarely updated.
manage:
Files are divided into chunks.
Chunks are replicated at different compute nodes of different racks.
master node or name node: another small file to find the chunks of a file.
master node is iteself replicated, and a directory for the file systme as a whole knows where to find its copies.
The directory itself can be replicated, and all participants using the DFS know where the directory copies are.
All you need to write are two functions, called Map and Reduce.
a MapReduce computation executes as follows:
Map function: Map tasks turn the chunk given into a sequence of key-value pairs.
The key-value pairs from each Map task are collected by a master controller and sorted by key.
divide by key: all pairs with same key $\to$ same Reduce task.
Reduce function: The Reduce tasks work on one key at a time, and combine all the values associated with that key in some way.
In [4]:
plt.imshow(plt.imread('./res/fig2_2.png'))
Out[4]:
reducer: the application of the Reduce function to a single key and its associated list of values.
a Reduce task executes one or more reducers.
why not to execute each reducer a separate Reduce task for maximum parallelism?
There is overhead associated with each task we create.
number of Reduce tasks $<$ number of reducers.
There is often significant variation in the lengths of the value lists for different keys, so different reducers take different amount of time. $\to$ skew.
if keys are sent randomly to less Reduce tasks $\to$ expect that time is average. $\to$ number of Reduce tasks $<$ compute nodes.
In [2]:
plt.imshow(plt.imread('./res/fig2_3.png'))
Out[2]:
We first assume that $n$ is large, but not so larget that $\mathbf{v}$ cannot fit in main memory and thus be available to every Map task.
Map:
Reduce: simply sums all the values and produces pair $(i, x_i)$.
In [3]:
plt.imshow(plt.imread('./res/fig2_4.png'))
Out[3]:
Each Map task is assigned a chunk from one of the stripes of the matrix and gets the entire corresponding stripe of the vector.
particular application (PageRank calculation) has an additional constraint that the result vector should be partitioned in the same way as the input vector.
We shall see there that the best strategy involves partitioning the matrix $\mathbf{M}$ into square blocks, rahter than stripes.
There are many operations on data that can be described easily in terms of the common database-query primitives.
a relation is a table with column headers called attributes.
Rows of the relation are called tuples.
The set of attributes of a relation is called its schema.
$R(A_1, A_2, \dotsc, A_n)$: the relation name is $R$ and its attributes are $A_1, A_2, \dotsc, A_n$.
In [4]:
plt.imshow(plt.imread('./res/fig2_5.png'))
Out[4]:
relational algebra: several standard operations on relations.
Selection $\sigma{C} (R)$: select tuples that satisfy $C$
Projection $\pi_{S} (R)$: produces subset $S$ of the attributed
Union, Intersection, and Difference
Union:
Intersection:
Difference $S - R$:
Natural Join $R \bowtie S$
Grouping and Aggregation $\gamma_{X} (R)$: where $X$ consists of:
Let $R(A, B, C)$, for $\gamma_{A, \theta(B)} (R)$:
$\mathbf{M} \times \mathbf{N}$
grouping and aggregation
1st MapReduce
2nd MapReduce
Map:
Reduce: Each key $(i, k)$ will have an associated list with all the values $(M, j, m_{ij})$ and $(N, j, n_{jk})$, for all possible values of $j$.
In [3]:
plt.imshow(plt.imread('./res/fig2_6.png'))
Out[3]:
In [4]:
plt.imshow(plt.imread('./res/fig2_7.png'))
Out[4]:
In [13]:
def calc_prob(p, t):
return 10 - 9 * ((1 - p)**t)
p = np.linspace(0, 1, 100)
y1 = calc_prob(p, 10)
y2 = calc_prob(p, 100)
plt.plot(p, y1, 'b', p, y2, 'g')
Out[13]:
The communication cost of a task is the size of the input to the task. we shall often use the number of tuples as a measure size, rather than bytes.
The communication cost of an algorithm is the sum of the communication cost of all the tasks implementing that algorithm. We shall focus on the communication cost as the way to measure the efficiency of an algorithm, since the exceptions, where execution time of tasks dominates, are rare in practice.
We count only input size, and not output size.
example: $R(A, B) \bowtie S(B, C) \bowtie T(C, D)$.
Suppose that the relation $R, S, T$ have sizes $r, s$, and $t$, respectively. and for simplicity, suppose $p$ is the probability that any two tuples in each relations agree on the item they share.
$\operatorname{join} \left ( \operatorname{join} \left ( R(A, B) \bowtie S(B, C) \right ) \bowtie T(C, D) \right )$ or exchange the sequence of join.
1st MapReduce $\operatorname{join} \left ( R(A, B) \bowtie S(B, C) \right )$ $O(t + prs)$
2nd MapReduce $O(r + s + t + prs)$
Assume:
We plan to use $k$ reducers for the job.
$b, c$ represents the number of buckets into which we shall hash $B-$ and $C-$values, respectively.
$h(B) \to b$.
$g(C) \to c$.
we require $b c = k$.
So, the reducer corresponding to bucket pair $(i, j)$ is responsible for joining the tuples $R(u, v), S(v, w)$, and $T(w, x)$ whenever $h(v) = i$ and $g(w) = j$.
$S(v, w)$ to reducer $(h(v), g(w))$. communication cost: $s$
$R(u, v)$ to $c$ reducers $(h(v), c)$. communication cost: $c r$
$T(w, x)$ to $b$ reducers $(b, g(w))$. communication cost: $b t$
There is also a fixed cost $r + s + t$ to make each tuple of each relation be input to one of the Map tasks.
The problem arises: $$\operatorname{arg \, min}_{b, c} s + cr + bt \text{where} bc = k$$ We get the solution: $c = \sqrt{kt / r}$ and $b = \sqrt{kr / t}$. So $s + cr + bt = s + 2 \sqrt{k r t}$.
In all, the total communication cost is $r + 2s + t + 2 \sqrt{k r t}$.
two parameters that characterize families of MapReduce algorithms:
reducer size$q$: the upper bound on the number of values that are allowed to appear in the list associated with a single key. It can be selected with at least two goals in mind:
replication rate$r$: the number of key-value pairs producted by all the Map tasks on all the inputs, divided by the number of inputs. It is the average communication from Map tasks to Reduce tasks per input.
we are given a large set of element $X$ and a similarity measure $s(x, y)$ which is symmetric. The output of the algorithm is those pairs whose similarity exceeds a given threshold $t$.
eg: discover similar images in a collection of one million images.
solution:
$(\{i, j\}, [P_i, P_j])$ this algorithm will fail completely: the reducer size is small, however, the replication rate is 999,999 $\to$ the communication cost is extremelly large.
We can group pictures into $g$ groups, each of $10^6 / g$ pictures.
In this section, we hope to prove lower bounds on the replication rate. The first step is to introduce a graph model of problems.
For each problem solvable by a MapReduce algorithm there is:
A set of inputs.
A set of outputs.
A many-many relationship between the inputs and outputs, which describes which inputs are necessary to produce which outputs.
In [3]:
plt.imshow(plt.imread('./res/fig2_9.png'))
Out[3]:
mapping schema expresses how outputs are produced by the various reducers.
A mapping schema for a given problem with a given reducer size $q$ is an assignment of inputs to one or more reducers, such that:
No reducer is assigned more than $q$ inputs.
For every output of the problem, there is at least one reducer that is assigned all the inputs that are related to that output. We say this reducer covers the output.
minimum possible communication is known if we can prove a matching lower bound.
Here is an outline of the technique:
Prove an upper bound $g(q)$ on how many outputs a reducer with $q$ inputs can cover.
Determine the total number of outputs produced by the problem.
Suppose that there are $k$ reducers, and the $i$th reducer has $q_i < q$ inputs. Observe that $\sum_{i=1}^k g(q_i)$ must be no less than the number of outputs computed in step (2).
Manipulate the inequality from (3) to get a lower bound on $\sum_{i=1}^k q_i$. Often, the trick used is to replace some factors of $q_i$ by their upper bound $q$, but leave a single factor of $q_i$ in the term for $i$.
Since $\sum_{i=1}^k q_i$ is the total communication from Map tasks to Reduce tasks, divide to lower bound from (4) on this quantity by the number of inputs. The result is a lower bound on the replication rate.