MapReduce using SPARK


In [1]:
%pylab inline
import pandas as pd
import seaborn as sns
pd.set_option('display.width', 500)
pd.set_option('display.max_columns', 100)


Populating the interactive namespace from numpy and matplotlib

With shameless stealing of some code and text from:

which you should go check out.

Installing Spark locally

Step 1: Install Apache Spark

For example, for Mac users using Homebrew:

$ brew install apache-spark

Step 2: Install the Java SDK version 1.8 or above for your platform (not just the JRE runtime)

Make sure you can access commands such as java on your command line.

Step 3: Install the latest findspark package using pip

➜  ~  pip install findspark
Collecting findspark
  Downloading findspark-0.0.5-py2.py3-none-any.whl
Installing collected packages: findspark
Successfully installed findspark-0.0.5

Spark Context

You can also use it directly from the notebook interface on the mac if you installed apache-spark using brew and also installed findspark above.


In [2]:
import findspark
findspark.init('c:\spark')
import pyspark
sc = pyspark.SparkContext()

In [3]:
sc


Out[3]:
<pyspark.context.SparkContext at 0x7feaeb0>

In [4]:
sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).map(lambda x: x**2).sum()


Out[4]:
385

Create A RDD


In [5]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
# Print out the type of wordsRDD
print type(wordsRDD)


<class 'pyspark.rdd.RDD'>

Call collect on an RDD: Lazy Spark

Spark is lazy. Until you collect, nothing is actually run.

Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program.


In [6]:
wordsRDD.collect()


Out[6]:
['cat', 'elephant', 'rat', 'rat', 'cat']

Operations on RDDs

From the Spark Programming Guide:

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).

Word Examples


In [7]:
def makePlural(word):
    return word + 's'

print makePlural('cat')


cats

Transform one RDD into another.


In [8]:
pluralRDD = wordsRDD.map(makePlural)
print pluralRDD.first()
print pluralRDD.take(2)


cats
['cats', 'elephants']

In [9]:
pluralRDD.take(1)


Out[9]:
['cats']

In [10]:
pluralRDD.collect()


Out[10]:
['cats', 'elephants', 'rats', 'rats', 'cats']

Key Value Pairs


In [11]:
wordPairs = wordsRDD.map(lambda w: (w, 1))
print wordPairs.collect()


[('cat', 1), ('elephant', 1), ('rat', 1), ('rat', 1), ('cat', 1)]

WORD COUNT!

This little exercise shows how to use mapreduce to calculate the counts of individual words in a list.


In [12]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
wordCountsCollected = (wordsRDD
                       .map(lambda w: (w, 1))
                       .reduceByKey(lambda x,y: x+y)
                       .collect())
print wordCountsCollected


[('rat', 2), ('elephant', 1), ('cat', 2)]


In [13]:
print (wordsRDD
    .map(lambda w: (w, 1))
    .reduceByKey(lambda x,y: x+y)).toDebugString()


(4) PythonRDD[19] at RDD at PythonRDD.scala:43 []
 |  MapPartitionsRDD[18] at mapPartitions at PythonRDD.scala:346 []
 |  ShuffledRDD[17] at partitionBy at null:-2 []
 +-(4) PairwiseRDD[16] at reduceByKey at <ipython-input-13-c1214cba7909>:3 []
    |  PythonRDD[15] at reduceByKey at <ipython-input-13-c1214cba7909>:3 []
    |  ParallelCollectionRDD[9] at parallelize at PythonRDD.scala:396 []

Using Cache


In [14]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
print wordsRDD
wordsRDD.count()


ParallelCollectionRDD[20] at parallelize at PythonRDD.scala:396
Out[14]:
5

Normally, every operation is run from the start. This may be inefficient in many cases. So when appropriate, we may want to cache the result the first time an operation is run on an RDD.


In [15]:
#this is rerun from the start
wordsRDD.count()


Out[15]:
5

In [16]:
#default storage level (MEMORY_ONLY)
wordsRDD.cache()#nothing done this is still lazy


Out[16]:
ParallelCollectionRDD[20] at parallelize at PythonRDD.scala:396

In [17]:
#parallelize is rerun and cached because we told it to cache
wordsRDD.count()


Out[17]:
5

In [18]:
#this `sc.parallelize` is not rerun in this case
wordsRDD.count()


Out[18]:
5

Where is this useful: it is when you have branching parts or loops, so that you dont do things again and again. Spark, being "lazy" will rerun the chain again. So cache or persist serves as a checkpoint, breaking the RDD chain or the lineage.


In [19]:
birdsList=['heron','owl']
animList=wordsList+birdsList
animaldict={}
for e in wordsList:
    animaldict[e]='mammal'
for e in birdsList:
    animaldict[e]='bird'
animaldict


Out[19]:
{'cat': 'mammal',
 'elephant': 'mammal',
 'heron': 'bird',
 'owl': 'bird',
 'rat': 'mammal'}

In [20]:
animsrdd = sc.parallelize(animList, 4)
animsrdd.cache()
#below runs the whole chain but causes cache to be populated
mammalcount=animsrdd.filter(lambda w: animaldict[w]=='mammal').count()
#now only the filter is carried out
birdcount=animsrdd.filter(lambda w: animaldict[w]=='bird').count()
print mammalcount, birdcount


5 2

In [ ]:

Exercises: Fun with MapReduce

Read http://spark.apache.org/docs/latest/programming-guide.html for some useful background and then try out the following exercises

The file ./sparklect/english.stop.txt contains a list of English stopwords, while the file ./sparklect/shakes/juliuscaesar.txt contains the entire text of Shakespeare's 'Julius Caesar'.

  • Load all of the stopwords into a Python list
  • Load the text of Julius Caesar into an RDD using the sparkcontext.textFile() method. Call it juliusrdd.

In [21]:
# your turn
stopw = pd.read_csv('./sparklect/english.stop.txt')
sc.stop()
sc = pyspark.SparkContext()
juliusrdd = sc.textFile('./sparklect/shakes/juliuscaesar.txt')

How many words does Julius Caesar have? Hint: use flatMap().


In [22]:
# your turn
counts = juliusrdd.flatMap(lambda l: l.split(' ')).map(lambda w: (w,1)).reduceByKey(lambda a,b: a+b).collect()

Now print the first 20 words of Julius Caesar as a Python list.


In [23]:
# your turn
for i in range(0,19):
    print counts[i]


(u'', 12571)
(u'fawn', 2)
(u'pardon', 2)
(u'shouted.', 1)
(u'silent,', 1)
(u'foul', 1)
(u'four', 1)
(u'Leap', 1)
(u'sleep', 5)
(u"friend's", 1)
(u'Publius,', 4)
(u'Publius.', 2)
(u'hate', 1)
(u'up.', 3)
(u'up,', 3)
(u'pardon!', 1)
(u'venom', 1)
(u'presents', 1)
(u'whatsoever', 1)

Now print the first 20 words of Julius Caesar, after removing all the stopwords. Hint: use filter().


In [24]:
# your turn
    
def notStopWord(w):
    l=[]
    l.append(w.encode('ascii','replace'))
    if (w == u'') | (w == u'a'):   #removing newline; why 'a' is not in the list of stop words? 
        return False
    return ~any(stopw.isin(l))

counts = (juliusrdd
        .flatMap(lambda l: l.lower().split(' '))
        .map(lambda w: w.strip(".").strip(",").strip('"').strip("'"))
        .filter(lambda w: notStopWord(w))
        .map(lambda w: (w,1))
        .reduceByKey(lambda a,b: a+b)
        .collect())
for i in range(0,19):
    print counts[i]


(u'fawn', 2)
(u'pardon', 9)
(u'hats', 1)
(u'foul', 1)
(u'straws', 1)
(u'sleep', 7)
(u"friend's", 1)
(u'appetite', 1)
(u'hate', 1)
(u'vile', 8)
(u'pardon!', 1)
(u'tween', 2)
(u'whatsoever', 1)
(u'sway', 1)
(u'reveler!', 1)
(u'today?', 1)
(u'void', 1)
(u'today;', 1)
(u'stabbed', 1)

Now, use the word counting MapReduce code you've seen before. Count the number of times each word occurs and print the top 20 results as a list of tuples of the form (word, count). Hint: use takeOrdered() instead of take()


In [25]:
# your turn

counts = (juliusrdd
        .flatMap(lambda l: l.lower().split(' '))
        .map(lambda w: w.strip(".").strip(",").strip('"').strip("'"))
        .filter(lambda w: notStopWord(w))
        .map(lambda w: (w,1))
        .reduceByKey(lambda a,b: a+b))
top20 = counts.takeOrdered(20, key = lambda x: -x[1])

Plot a bar graph. For each of the top 20 words on the X axis, represent the count on the Y axis.


In [26]:
# your turn
import matplotlib.pyplot as plt

y, x = zip(*top20)
ax = sns.barplot(x, y)
ax.set(xlabel='Counts', ylabel='Word')
ax.set_title('Most common words in Shakespeare\'s Julius Caesar')
plt.show()


Using partitions for parallelization

In order to make your code more efficient, you want to use all of the available processing power, even on a single laptop. If your machine has multiple cores, you can tune the number of partitions to use all of them! From http://www.stat.berkeley.edu/scf/paciorek-spark-2014.html:

You want each partition to be able to fit in the memory availalbe on a node, and if you have multi-core nodes, you want that as many partitions as there are cores be able to fit in memory.

For load-balancing you'll want at least as many partitions as total computational cores in your cluster and probably rather more partitions. The Spark documentation suggests 2-4 partitions (which they also seem to call slices) per CPU. Often there are 100-10,000 partitions. Another rule of thumb is that tasks should take at least 100 ms. If less than that, you may want to repartition to have fewer partitions.


In [27]:
shakesrdd=sc.textFile("./sparklect/shakes/*.txt", minPartitions=4)

In [28]:
shakesrdd.take(10)


Out[28]:
[u'\ufeff1601',
 u'AS YOU LIKE IT',
 u'',
 u'by William Shakespeare',
 u'',
 u'',
 u'',
 u'DRAMATIS PERSONAE.',
 u'',
 u'']

Now calculate the top 20 words in all of the files that you just read.


In [29]:
# your turn

counts = (shakesrdd
        .flatMap(lambda l: l.lower().split(' '))
        .map(lambda w: w.strip(".").strip(",").strip('"').strip("'"))
        .filter(lambda w: notStopWord(w))
        .map(lambda w: (w,1))
        .reduceByKey(lambda a,b: a+b))
top20 = counts.takeOrdered(20, key = lambda x: -x[1])
print top20


[(u'thou', 2093), (u'thy', 1548), (u'good', 1173), (u'thee', 1112), (u'sir', 1103), (u'lord', 876), (u'love', 825), (u'hath', 788), (u'man', 665), (u'make', 651), (u'enter', 624), (u'tis', 609), (u'give', 568), (u'duke', 539), (u'king', 535), (u'speak', 475), (u'brutus', 459), (u'time', 440), (u'mine', 379), (u'hear', 364)]

Optional topic 1: DataFrames

Pandas and Spark dataframes can be easily converted to each other, making it easier to work with different data formats. This section shows some examples of each.

Convert Spark DataFrame to Pandas

pandas_df = spark_df.toPandas()

Create a Spark DataFrame from Pandas

spark_df = context.createDataFrame(pandas_df)

Must fit in memory.

VERY IMPORTANT: DataFrames in Spark are like RDD in the sense that they’re an immutable data structure.


In [30]:
df=pd.read_csv("https://dl.dropboxusercontent.com/u/75194/stats/data/01_heights_weights_genders.csv")
df.head()


Out[30]:
Gender Height Weight
0 Male 73.847017 241.893563
1 Male 68.781904 162.310473
2 Male 74.110105 212.740856
3 Male 71.730978 220.042470
4 Male 69.881796 206.349801

Convert this pandas dataframe to a Spark dataframe


In [31]:
from pyspark.sql import SQLContext
sqlsc=SQLContext(sc)
sparkdf = sqlsc.createDataFrame(df)
sparkdf


Out[31]:
DataFrame[Gender: string, Height: double, Weight: double]

In [32]:
sparkdf.show(5)


+------+-----------------+----------------+
|Gender|           Height|          Weight|
+------+-----------------+----------------+
|  Male|  73.847017017515|241.893563180437|
|  Male|68.78190404589029|  162.3104725213|
|  Male|74.11010539178491|  212.7408555565|
|  Male| 71.7309784033377|220.042470303077|
|  Male| 69.8817958611153|206.349800623871|
+------+-----------------+----------------+


In [33]:
type(sparkdf.Gender)


Out[33]:
pyspark.sql.column.Column

In [34]:
temp = sparkdf.map(lambda r: r.Gender)
print type(temp)
temp.take(10)


<class 'pyspark.rdd.PipelinedRDD'>
Out[34]:
[u'Male',
 u'Male',
 u'Male',
 u'Male',
 u'Male',
 u'Male',
 u'Male',
 u'Male',
 u'Male',
 u'Male']

Optional topic 2: Machine Learning using Spark

While we don't go in-depth into machine learning using spark here, this sample code will help you get started.


In [36]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.regression import LabeledPoint

Now create a data set from the Spark dataframe


In [38]:
data=sparkdf.map(lambda row: LabeledPoint(row.Gender=='Male',[row.Height, row.Weight]))
data.take(5)


Out[38]:
[LabeledPoint(1.0, [73.8470170175,241.89356318]),
 LabeledPoint(1.0, [68.7819040459,162.310472521]),
 LabeledPoint(1.0, [74.1101053918,212.740855557]),
 LabeledPoint(1.0, [71.7309784033,220.042470303]),
 LabeledPoint(1.0, [69.8817958611,206.349800624])]

In [39]:
data2=sparkdf.map(lambda row: LabeledPoint(row[0]=='Male',row[1:]))
data2.take(1)[0].label, data2.take(1)[0].features


Out[39]:
(1.0, DenseVector([73.847, 241.8936]))

Split the data set into training and test sets


In [40]:
train, test = data.randomSplit([0.7,0.3])
train.cache()
test.cache()


Out[40]:
PythonRDD[44] at RDD at PythonRDD.scala:43

In [41]:
type(train)


Out[41]:
pyspark.rdd.PipelinedRDD

Train the logistic regression model using MLIB


In [42]:
model = LogisticRegressionWithLBFGS.train(train)

In [43]:
model.weights


Out[43]:
DenseVector([-0.255, 0.1053])

Run it on the test data


In [44]:
results = test.map(lambda lp: (lp.label, float(model.predict(lp.features))))
print results.take(10)
type(results)


[(1.0, 1.0), (1.0, 0.0), (1.0, 1.0), (1.0, 1.0), (1.0, 1.0), (1.0, 1.0), (1.0, 1.0), (1.0, 1.0), (1.0, 1.0), (1.0, 1.0)]
Out[44]:
pyspark.rdd.PipelinedRDD

Measure accuracy and other metrics


In [45]:
test_accuracy=results.filter(lambda (a,p): a==p).count()/float(results.count())
test_accuracy


Out[45]:
0.9157718120805369

In [46]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
metrics = BinaryClassificationMetrics(results)

In [47]:
print type(metrics)
metrics.areaUnderROC


<class 'pyspark.mllib.evaluation.BinaryClassificationMetrics'>
Out[47]:
0.9158215375606679

In [48]:
type(model)


Out[48]:
pyspark.mllib.classification.LogisticRegressionModel

In [51]:
#!rm -rf mylogistic.model

In [55]:
#model.save(sc, "mylogistic.model")

The pipeline API automates a lot of this stuff, allowing us to work directly on dataframes. It is not all supported in Python, as yet.

rdd.saveAsTextFile() saves an RDD as a string.


In [56]:
sc.stop()

In [ ]: