In [1]:
%pylab inline
import pandas as pd
import seaborn as sns
pd.set_option('display.width', 500)
pd.set_option('display.max_columns', 100)
With shameless stealing of some code and text from:
which you should go check out.
Step 2: Install the Java SDK version 1.8 or above for your platform (not just the JRE runtime)
Make sure you can access commands such as java
on your command line.
Step 3: Install the latest findspark package using pip
➜ ~ pip install findspark
Collecting findspark
Downloading findspark-0.0.5-py2.py3-none-any.whl
Installing collected packages: findspark
Successfully installed findspark-0.0.5
In [2]:
import findspark
findspark.init('c:\spark')
import pyspark
sc = pyspark.SparkContext()
In [3]:
sc
Out[3]:
In [4]:
sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).map(lambda x: x**2).sum()
Out[4]:
In [5]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
# Print out the type of wordsRDD
print type(wordsRDD)
Spark is lazy. Until you collect
, nothing is actually run.
Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program.
In [6]:
wordsRDD.collect()
Out[6]:
From the Spark Programming Guide:
RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).
In [7]:
def makePlural(word):
return word + 's'
print makePlural('cat')
Transform one RDD into another.
In [8]:
pluralRDD = wordsRDD.map(makePlural)
print pluralRDD.first()
print pluralRDD.take(2)
In [9]:
pluralRDD.take(1)
Out[9]:
In [10]:
pluralRDD.collect()
Out[10]:
In [11]:
wordPairs = wordsRDD.map(lambda w: (w, 1))
print wordPairs.collect()
In [12]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
wordCountsCollected = (wordsRDD
.map(lambda w: (w, 1))
.reduceByKey(lambda x,y: x+y)
.collect())
print wordCountsCollected
In [13]:
print (wordsRDD
.map(lambda w: (w, 1))
.reduceByKey(lambda x,y: x+y)).toDebugString()
In [14]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
print wordsRDD
wordsRDD.count()
Out[14]:
Normally, every operation is run from the start. This may be inefficient in many cases. So when appropriate, we may want to cache the result the first time an operation is run on an RDD.
In [15]:
#this is rerun from the start
wordsRDD.count()
Out[15]:
In [16]:
#default storage level (MEMORY_ONLY)
wordsRDD.cache()#nothing done this is still lazy
Out[16]:
In [17]:
#parallelize is rerun and cached because we told it to cache
wordsRDD.count()
Out[17]:
In [18]:
#this `sc.parallelize` is not rerun in this case
wordsRDD.count()
Out[18]:
Where is this useful: it is when you have branching parts or loops, so that you dont do things again and again. Spark, being "lazy" will rerun the chain again. So cache
or persist
serves as a checkpoint, breaking the RDD chain or the lineage.
In [19]:
birdsList=['heron','owl']
animList=wordsList+birdsList
animaldict={}
for e in wordsList:
animaldict[e]='mammal'
for e in birdsList:
animaldict[e]='bird'
animaldict
Out[19]:
In [20]:
animsrdd = sc.parallelize(animList, 4)
animsrdd.cache()
#below runs the whole chain but causes cache to be populated
mammalcount=animsrdd.filter(lambda w: animaldict[w]=='mammal').count()
#now only the filter is carried out
birdcount=animsrdd.filter(lambda w: animaldict[w]=='bird').count()
print mammalcount, birdcount
In [ ]:
Read http://spark.apache.org/docs/latest/programming-guide.html for some useful background and then try out the following exercises
The file ./sparklect/english.stop.txt
contains a list of English stopwords, while the file ./sparklect/shakes/juliuscaesar.txt
contains the entire text of Shakespeare's 'Julius Caesar'.
sparkcontext.textFile()
method. Call it juliusrdd
.
In [21]:
# your turn
stopw = pd.read_csv('./sparklect/english.stop.txt')
sc.stop()
sc = pyspark.SparkContext()
juliusrdd = sc.textFile('./sparklect/shakes/juliuscaesar.txt')
How many words does Julius Caesar have? Hint: use flatMap()
.
In [22]:
# your turn
counts = juliusrdd.flatMap(lambda l: l.split(' ')).map(lambda w: (w,1)).reduceByKey(lambda a,b: a+b).collect()
Now print the first 20 words of Julius Caesar as a Python list.
In [23]:
# your turn
for i in range(0,19):
print counts[i]
Now print the first 20 words of Julius Caesar, after removing all the stopwords. Hint: use filter()
.
In [24]:
# your turn
def notStopWord(w):
l=[]
l.append(w.encode('ascii','replace'))
if (w == u'') | (w == u'a'): #removing newline; why 'a' is not in the list of stop words?
return False
return ~any(stopw.isin(l))
counts = (juliusrdd
.flatMap(lambda l: l.lower().split(' '))
.map(lambda w: w.strip(".").strip(",").strip('"').strip("'"))
.filter(lambda w: notStopWord(w))
.map(lambda w: (w,1))
.reduceByKey(lambda a,b: a+b)
.collect())
for i in range(0,19):
print counts[i]
Now, use the word counting MapReduce code you've seen before. Count the number of times each word occurs and print the top 20 results as a list of tuples of the form (word, count)
. Hint: use takeOrdered()
instead of take()
In [25]:
# your turn
counts = (juliusrdd
.flatMap(lambda l: l.lower().split(' '))
.map(lambda w: w.strip(".").strip(",").strip('"').strip("'"))
.filter(lambda w: notStopWord(w))
.map(lambda w: (w,1))
.reduceByKey(lambda a,b: a+b))
top20 = counts.takeOrdered(20, key = lambda x: -x[1])
Plot a bar graph. For each of the top 20 words on the X axis, represent the count on the Y axis.
In [26]:
# your turn
import matplotlib.pyplot as plt
y, x = zip(*top20)
ax = sns.barplot(x, y)
ax.set(xlabel='Counts', ylabel='Word')
ax.set_title('Most common words in Shakespeare\'s Julius Caesar')
plt.show()
In order to make your code more efficient, you want to use all of the available processing power, even on a single laptop. If your machine has multiple cores, you can tune the number of partitions to use all of them! From http://www.stat.berkeley.edu/scf/paciorek-spark-2014.html:
You want each partition to be able to fit in the memory availalbe on a node, and if you have multi-core nodes, you want that as many partitions as there are cores be able to fit in memory.
For load-balancing you'll want at least as many partitions as total computational cores in your cluster and probably rather more partitions. The Spark documentation suggests 2-4 partitions (which they also seem to call slices) per CPU. Often there are 100-10,000 partitions. Another rule of thumb is that tasks should take at least 100 ms. If less than that, you may want to repartition to have fewer partitions.
In [27]:
shakesrdd=sc.textFile("./sparklect/shakes/*.txt", minPartitions=4)
In [28]:
shakesrdd.take(10)
Out[28]:
Now calculate the top 20 words in all of the files that you just read.
In [29]:
# your turn
counts = (shakesrdd
.flatMap(lambda l: l.lower().split(' '))
.map(lambda w: w.strip(".").strip(",").strip('"').strip("'"))
.filter(lambda w: notStopWord(w))
.map(lambda w: (w,1))
.reduceByKey(lambda a,b: a+b))
top20 = counts.takeOrdered(20, key = lambda x: -x[1])
print top20
Convert Spark DataFrame to Pandas
pandas_df = spark_df.toPandas()
Create a Spark DataFrame from Pandas
spark_df = context.createDataFrame(pandas_df)
Must fit in memory.
VERY IMPORTANT: DataFrames in Spark are like RDD in the sense that they’re an immutable data structure.
In [30]:
df=pd.read_csv("https://dl.dropboxusercontent.com/u/75194/stats/data/01_heights_weights_genders.csv")
df.head()
Out[30]:
Convert this pandas dataframe to a Spark dataframe
In [31]:
from pyspark.sql import SQLContext
sqlsc=SQLContext(sc)
sparkdf = sqlsc.createDataFrame(df)
sparkdf
Out[31]:
In [32]:
sparkdf.show(5)
In [33]:
type(sparkdf.Gender)
Out[33]:
In [34]:
temp = sparkdf.map(lambda r: r.Gender)
print type(temp)
temp.take(10)
Out[34]:
In [36]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.regression import LabeledPoint
Now create a data set from the Spark dataframe
In [38]:
data=sparkdf.map(lambda row: LabeledPoint(row.Gender=='Male',[row.Height, row.Weight]))
data.take(5)
Out[38]:
In [39]:
data2=sparkdf.map(lambda row: LabeledPoint(row[0]=='Male',row[1:]))
data2.take(1)[0].label, data2.take(1)[0].features
Out[39]:
Split the data set into training and test sets
In [40]:
train, test = data.randomSplit([0.7,0.3])
train.cache()
test.cache()
Out[40]:
In [41]:
type(train)
Out[41]:
Train the logistic regression model using MLIB
In [42]:
model = LogisticRegressionWithLBFGS.train(train)
In [43]:
model.weights
Out[43]:
Run it on the test data
In [44]:
results = test.map(lambda lp: (lp.label, float(model.predict(lp.features))))
print results.take(10)
type(results)
Out[44]:
Measure accuracy and other metrics
In [45]:
test_accuracy=results.filter(lambda (a,p): a==p).count()/float(results.count())
test_accuracy
Out[45]:
In [46]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
metrics = BinaryClassificationMetrics(results)
In [47]:
print type(metrics)
metrics.areaUnderROC
Out[47]:
In [48]:
type(model)
Out[48]:
In [51]:
#!rm -rf mylogistic.model
In [55]:
#model.save(sc, "mylogistic.model")
The pipeline API automates a lot of this stuff, allowing us to work directly on dataframes. It is not all supported in Python, as yet.
Also see:
rdd.saveAsTextFile()
saves an RDD as a string.
In [56]:
sc.stop()
In [ ]: