With massive data, we need to load, extract, transform and analyze the data on multiple computers to overcome I/O and processing bottlenecks. However, when working on multiple computers (possibly hundreds to thousands), there is a high risk of failure in one or more nodes. Distributed computing frameworks are designed to handle failures gracefully, allowing the developer to focus on algorithm development rather than system administration.
The first such widely used open source framework was the Hadoop MapReduce framework. This provided transparent fault tolerance, and popularized the functional programming approach to distributed computing. The Hadoop work-flow uses repeated invocations of the following instructions:
load dataset from disk to memory
map function to elements of dataset
reduce results of map to get new aggregate dataset
save new dataset to disk
Hadoop has two main limitations:
map
and reduce
constructs results in increased code complexity, since every problem must be tailored to the map-reduce
formatSpark is a more recent framework for distributed computing that addresses the limitations of Hadoop by allowing the use of in-memory datasets for iterative computation, and providing a rich set of functional programming constructs to make the developer's job easier. Spark also provides libraries for common big data tasks, such as the need to run SQL queries, perform machine learning and process large graphical structures.
These notes are based on a local installation of Spark. Usage of Spark on the OIT docker cluster is slightly different.
Note: This is a local installation. That means that it is intended for use with a local copy of Jupyter, and is independent of the Docker container on the OIT website. You would get access to this by opening your local copy of Jupyter (that you presumably installed using the Anaconda distribution), either form the Anaconda launcher or by opening a terminal and typing jupyter notebook
. Then you can open this notebook and run it.
If you are not clear about what is being described, please don't attempt this. Use the Docker notebook for pyspark that OIT will provide.
If you want to install locally, see instructions at http://spark.apache.org/downloads.html. It is simplest to use the provided binaries
cd ~
wget http://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz
tar xzf spark-2.1.0-bin-hadoop2.7.tgz
mv spark-2.1.0-bin-hadoop2.7 spark
Next install py4j
pip install py4j
Finally, edit your .bashrc or .bash_profile to incldue
export SPARK_HOME=~/spark
export PYSPARK_PYTHON=python3
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
export PYSPARK_SUBMIT_ARGS="--packages ${PACKAGES} pyspark-shell"
In [1]:
from pyspark import SparkContext
sc = SparkContext(master = 'local[*]')
If you are running Spark locally, you can check the status of your Spark environment and jobs using the Spark UI.
A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. Here we set it up to use local nodes - the argument locals[*]
means to use the local machine as the cluster, using as many worker threads as there are cores. You can also explicitly set the number of cores with locals[k]
where k
is an integer.
In [2]:
sc.version
Out[2]:
In [3]:
sc.pythonVer
Out[3]:
In [4]:
sc.master
Out[4]:
In [5]:
sc.sparkUser()
Out[5]:
In [6]:
sc.appName
Out[6]:
In [7]:
sc.defaultParallelism
Out[7]:
In [8]:
rdd = sc.parallelize(range(10))
In [9]:
rdd.getNumPartitions()
Out[9]:
In [10]:
rdd.count()
Out[10]:
In [11]:
rdd.sum()
Out[11]:
In [12]:
rdd.min(), rdd.max(), rdd.stdev(), rdd.variance()
Out[12]:
In [13]:
rdd.stats()
Out[13]:
In [14]:
rdd.first()
Out[14]:
In [15]:
rdd.top(3)
Out[15]:
In [16]:
rdd.take(3)
Out[16]:
In [17]:
rdd.sample(withReplacement=False, fraction=.5, seed=2).collect()
Out[17]:
In [19]:
res = rdd.sample(withReplacement=False, fraction=.5, seed=2)
In [20]:
res.collect()
Out[20]:
In [21]:
rdd.collect()
Out[21]:
In [22]:
rdd.glom().collect()
Out[22]:
In [23]:
xs = rdd.filter(lambda x: x % 2 == 0)
xs.collect()
Out[23]:
In [24]:
xs = rdd.map(lambda x: x**2)
xs.collect()
Out[24]:
In [25]:
xs = rdd.flatMap(lambda x: (x, x*x))
xs.collect()
Out[25]:
In [27]:
rdd.fold(0, lambda a, b: a + b)
Out[27]:
In [29]:
rdd.reduce(lambda a, b: a + b)
Out[29]:
Count
In [30]:
rdd.aggregate(0, lambda acc, _: acc + 1, lambda a, b: a+b)
Out[30]:
Sum
In [31]:
rdd.aggregate(0, lambda a, b: a + b, lambda a, b: a + b)
Out[31]:
Think of aggregate as first doing a transform then a reduce.
In [32]:
ss = sc.parallelize('the quick brown fox jumps over the lazy dog'.split())
In [33]:
ss.aggregate(0, lambda acc, s: acc + len(s), lambda a, b: a + b)
Out[33]:
In [34]:
len(''.join('the quick brown fox jumps over the lazy dog'.split()))
Out[34]:
In [35]:
(rdd.
map(lambda x: x+1).
fold(1, lambda a, b: a*b))
Out[35]:
In [36]:
(rdd.
map(lambda x: x+1).
reduce(lambda a, b: a*b))
Out[36]:
In [37]:
(
sc.parallelize(range(10))
.filter(lambda x: x % 2 == 0)
.map(lambda x: x**2)
.collect()
)
Out[37]:
In [38]:
import string
In [39]:
rdd = sc.parallelize(zip(2*string.ascii_lowercase[:5], range(10)))
In [40]:
rdd.collect()
Out[40]:
In [41]:
rdd.keys().collect()
Out[41]:
In [42]:
rdd.values().collect()
Out[42]:
In [43]:
d = rdd.collectAsMap()
list(d.items())[:5]
Out[43]:
In [44]:
ys = rdd.mapValues(lambda x: x**2)
ys.collect()
Out[44]:
In [45]:
rdd.countByKey()
Out[45]:
In [46]:
rdd.countByValue()
Out[46]:
In [47]:
ys = rdd.reduceByKey(lambda x, y: x + y)
ys.collect()
Out[47]:
In [48]:
ulysses = sc.textFile('data/Ulysses.txt')
In [49]:
ulysses.take(10)
Out[49]:
In [50]:
def tokenize(line):
table = dict.fromkeys(map(ord, string.punctuation))
return line.translate(table).lower().split()
In [51]:
words = ulysses.flatMap(lambda line: tokenize(line))
words.take(10)
Out[51]:
In [52]:
counts = words.map(lambda x: (x, 1)).countByKey()
sorted(counts.items(), key=lambda x: x[1], reverse=True)[:10]
Out[52]:
In [53]:
words1 = words.map(lambda x: (x, 1))
words1.take(5)
Out[53]:
In [54]:
counts = words1.reduceByKey(lambda a, b: a + b)
counts.take(5)
Out[54]:
In [55]:
counts.takeOrdered(10, key=lambda x: -x[1])
Out[55]:
In [56]:
(
sc.textFile('data/Ulysses.txt').
flatMap(lambda line: tokenize(line)).
map(lambda x: (x, 1)).
reduceByKey(lambda a, b: a + b).
takeOrdered(10, key=lambda x: -x[1])
)
Out[56]:
In [57]:
counts.is_cached
Out[57]:
In [58]:
counts.persist()
Out[58]:
In [59]:
counts.is_cached
Out[59]:
In [60]:
counts.takeOrdered(5, lambda x: -x[1])
Out[60]:
In [61]:
counts.takeOrdered(5, lambda x: x[1])
Out[61]:
In [62]:
count_dict = counts.collectAsMap()
count_dict['circle']
Out[62]:
In [63]:
portrait = sc.textFile('data/Portrait.txt')
In [64]:
portrait.take(10)
Out[64]:
In [65]:
counts1 = (
portrait.flatMap(lambda line: tokenize(line))
.map(lambda x: (x, 1))
.reduceByKey(lambda x,y: x+y)
)
In [66]:
counts1.persist()
Out[66]:
In [67]:
counts1.take(5)
Out[67]:
In [68]:
joined = counts.join(counts1)
In [69]:
joined.take(5)
Out[69]:
In [70]:
joined2 = joined.mapValues(lambda x: x[0] + x[1])
In [71]:
joined2.take(5)
Out[71]:
In [72]:
joined3 = joined.mapValues(lambda x: np.mean(x))
In [73]:
joined3.take(5)
Out[73]:
In [ ]: