Set environmental variables:
Edit spark-env.sh to add the following:
SPARK_HOME/bin/spark-submit --master //master-url// --py-files //comma-separated python dependencies as zip// our-python-code.py //custom args//
HELP: SPARK_HOME/bin/spark-submit —help
In [2]:
import os
import sys
sys.path.append(os.environ["SPARK_HOME"] + "/python/lib/py4j-0.9-src.zip")
sys.path.append(os.environ["SPARK_HOME"] + "/python/lib/pyspark.zip")
In [61]:
from pyspark import SparkConf, SparkContext
from pyspark import SparkFiles
from pyspark import StorageLevel
from pyspark import AccumulatorParam
In [4]:
sconf = SparkConf()
sconf.setAppName("PySpark Tutorial")
sconf.setMaster("spark://snehasish-barmans-macbook.local:7077")
sc = SparkContext.getOrCreate(conf = sconf)
In [5]:
print sc
print sc.version
In [35]:
sc.parallelize([1,2, "abc", (1,2), {4,5,6}]).collect()
Out[35]:
In [30]:
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 3)
rdd2 = sc.parallelize(xrange(10, 20), 3)
In [31]:
print rdd.glom().collect() # shows data grouped by partitions
print rdd2.glom().collect()
In [7]:
rdd.map(lambda x: x**2).collect() # map 1-to-1 transformation, operates on every element of rdd
Out[7]:
In [19]:
# functions must be self-contained, no states or access global variables
def trans1(x):
return x**2
rdd.map(trans1).collect()
Out[19]:
In [20]:
rdd.filter(lambda x: x > 5).collect() # filter
Out[20]:
In [48]:
datasets = "../../../Machine_Learning/WPI_ML/datasets"
print os.path.exists(datasets)
#print os.path.realpath(datasets)
In [55]:
# default is hdfs filesystem, to access local files, use namespace -> file:///
textrdd = sc.textFile("file:///" + os.path.realpath(datasets) + "/Audio_Standardization_ Sentences.txt",
use_unicode = False, minPartitions = 3)
In [56]:
textrdd.glom().collect()
Out[56]:
In [62]:
textrdd.flatMap(lambda x: x.split(" ")).take(11) # 1-to-many transformation, puts in a global list
Out[62]:
In [63]:
def countWordsInPartition(iterator):
"""
@params:
iterator: a partition of the rdd
"""
count = 0
for x in iterator:
count += len(x.split(" "))
yield count
textrdd.mapPartitions(countWordsInPartition).collect() # same as map but operates on each chunk/partition of the rdd
Out[63]:
In [65]:
rdd.sortBy(keyfunc = lambda x: x, ascending = False, numPartitions = 1).collect() # sorting
# numPartitions controls the level of parallelism
Out[65]:
In [76]:
rdd.sample(withReplacement = False, fraction = 0.5, seed = 13).collect() # sampling
Out[76]:
In [78]:
rdd.coalesce(1).glom().collect() # reduce no. of partitions by combining partitions from each worker, thereby minimizing network traffic
Out[78]:
In [80]:
rdd.repartition(2).glom().collect()
# increases or decreases the no. of partitions, but at the cost of more network traffic,
# because Spark has to shuffle the data across the workers.
# Use coalesce when intent to decrease the partitions.
Out[80]:
In [81]:
rdd.repartition(5).glom().collect()
Out[81]:
In [89]:
rdd.union(rdd2).collect() # combines two rdds -> A u B
Out[89]:
In [90]:
rdd.intersection(rdd2).collect() # intersection -> A n B
Out[90]:
In [46]:
rdd.subtract(rdd2).collect() # subtract -> A - B, removes all the common elements between A and B from A and returns the rest
Out[46]:
In [96]:
rdd.union(rdd2).distinct().sortBy(ascending = True, keyfunc = lambda x:x).collect() # distinct
Out[96]:
In [98]:
rdd.cartesian(rdd2).take(5) # all pair combinations; creates key-value RDD
Out[98]:
In [46]:
rdd.zip(rdd2).collect() # zip (same as zip() in python); creates key-value RDD
Out[46]:
In [38]:
rdd.keyBy(lambda x: x % 3).collect() # keyBy (converts a normal RDD into key-value RDD based on a criteria)
# result of the criteria becomes the 'key' and the element itself becomes the 'value'.
Out[38]:
In [41]:
print rdd.groupBy(lambda x: x % 3).collect() # groupBy - same as 'keyBy' but all the values of a key are grouped into an iterable
print list(rdd.groupBy(lambda x: x % 3).collect()[0][1])
In [7]:
file_name = "square_nums.py"
sc.addFile("./" + file_name) # All workers will download this file to their node
In [9]:
rdd.pipe("cat").collect() # pipe
# Use an external program for custom transformations.
# Reads data as string per partition from standard input and writes as string to standard output.
Out[9]:
In [8]:
rdd.pipe(SparkFiles.get(file_name)).glom().collect() # pipe
Out[8]:
In [10]:
rdd.reduce(lambda acc, x: acc+x) # reduce; operation must satisfy associative and communtative property
Out[10]:
In [11]:
rdd.count() # count
Out[11]:
In [12]:
rdd.take(4) # take (returns as a list; selects data from one partition, then moves to another partition as required to satisfy the limit)
Out[12]:
In [13]:
rdd.takeSample(False, 5, seed = 13) # takeSample
Out[13]:
In [15]:
rdd.takeOrdered(4, key = lambda x:x) # takeOrdered
Out[15]:
In [16]:
rdd.collect()
Out[16]:
In [21]:
rdd.first() # first
Out[21]:
In [26]:
rdd.top(4, key = int) # top ; returns top n items in descending order
Out[26]:
In [27]:
rdd.countApprox(1000, 0.5) # countApprox
Out[27]:
In [35]:
rdd.countApproxDistinct(0.7) # number of distinct elements
Out[35]:
In [32]:
def showValues(x):
print "hello: " + str(x)
rdd.foreach(showValues) # foreach (Applies a function to every element of rdd)
# useful to communicate to external services, accumulate values in a queue, logging info, ...
# NOTE: verify results in stderr file of the working dir
In [37]:
def showValuesPartition(iterator):
vals = []
for item in iterator:
vals.append("hello: " + str(item))
print vals
rdd.foreachPartition(showValuesPartition) # foreachPartition (Applies a function per partition of rdd)
In [38]:
rdd.max()
Out[38]:
In [39]:
rdd.min()
Out[39]:
In [40]:
rdd.stats()
Out[40]:
In [41]:
rdd.sum()
Out[41]:
In [42]:
rdd.mean()
Out[42]:
In [44]:
rdd.stdev()
Out[44]:
In [51]:
# must be an absolute path to directory name; default is hdfs namespace
# creates a part-xxxx file for each partition
rdd.saveAsTextFile("file:///" + os.path.realpath("./textfiles")) # saveAsTextFile
In [52]:
# using compression
# compresses part-xxxx file of each partition
rdd.saveAsTextFile("file:///" + os.path.realpath("./textfileszip"),
compressionCodecClass = "org.apache.hadoop.io.compress.GzipCodec") # saveAsTextFile
In [53]:
rdd.saveAsPickleFile("file:///" + os.path.realpath("./textfiles-pickled")) # saveAsPickleFile (faster reads, writes)
In [32]:
rdd.countByValue() # countByValue - returns as dict of value: count
Out[32]:
In [39]:
rdd.isEmpty() # isEmpty
Out[39]:
In [43]:
print rdd.getStorageLevel() # getStorageLevel
In [44]:
rdd.getNumPartitions() # getNumPartitions
Out[44]:
In [54]:
rdd.persist(StorageLevel.DISK_ONLY)
print rdd.is_cached
print rdd.getStorageLevel()
rdd.unpersist()
print rdd.is_cached
print rdd.getStorageLevel()
In [6]:
krdd = sc.parallelize([("a", 1), ("a", 2), ("b", 1), ("b", 2), ("c", 1)], 2)
krdd2 = sc.parallelize([("a", 3), ("b", 3), ("d", 1)], 2)
In [7]:
print krdd.glom().collect()
print krdd2.glom().collect()
In [8]:
krdd.groupByKey().collect() # groupByKey
Out[8]:
In [9]:
list(krdd.groupByKey().collect()[0][1])
Out[9]:
In [11]:
krdd.reduceByKey(lambda acc, x: acc + x, numPartitions = 1).collect() # reduceByKey
# does a groupByKey, followed by reduction
# operation must obey associative and commutative properties
# numPartitions controls the level of parallelism
Out[11]:
In [16]:
# http://www.learnbymarketing.com/618/pyspark-rdd-basics-examples/
# does a groupByKey, followed by custom reduce function that doesn't have to obey commutative and associative property
# define a resultset template (any data structure) with initial values
init_state_template = [0]
def mergeValuesWithinPartition(template, val):
template[0] = template[0] + val
return template
def mergePartitions(template1, template2):
template = template1[0] + template2[0]
return template
krdd.aggregateByKey(init_state_template,
mergeValuesWithinPartition,
mergePartitions).collect() # aggregateByKey
Out[16]:
In [17]:
krdd.sortByKey(ascending = False, numPartitions = 1, keyfunc = lambda x: x).collect() # sortByKey (can also use sortBy)
Out[17]:
In [46]:
krdd.join(krdd2).collect() # join (inner-join in SQL; returns all-pair combinations)
Out[46]:
In [47]:
krdd.leftOuterJoin(krdd2).collect() # leftOuterJoin (left join in SQL)
Out[47]:
In [48]:
krdd.rightOuterJoin(krdd2).collect() # rightOuterJoin (right join in SQL)
Out[48]:
In [49]:
krdd.fullOuterJoin(krdd2).collect() # fullOuterJoin (full join in SQL)
Out[49]:
In [50]:
krdd.cogroup(krdd2).collect() # cogroup (returns iterator one for each rdd)
Out[50]:
In [52]:
print list(krdd.cogroup(krdd2).collect()[0][1][0])
print list(krdd.cogroup(krdd2).collect()[0][1][1])
In [53]:
print list(krdd.cogroup(krdd2).collect()[2][1][0])
print list(krdd.cogroup(krdd2).collect()[2][1][1])
In [14]:
krdd.mapValues(lambda x: x**2).collect() # mapValues
Out[14]:
In [19]:
krdd_val_iter = sc.parallelize([("a", [1,2,3]), ("b", [4,5,6])])
krdd_val_iter.flatMapValues(lambda x: [y**2 for y in x]).collect() # flatMapValues
# works in which value is an iterable object
# unpacks all elements in the iterable into their own key-value tuple/pair; puts them in a single list
Out[19]:
In [20]:
krdd_val_iter.mapValues(lambda x: [y**2 for y in x]).collect() # mapValues 1-to-1
Out[20]:
In [26]:
krdd.keys().collect() # keys
Out[26]:
In [25]:
krdd.values().collect() # values
Out[25]:
In [8]:
krdd.collect()
Out[8]:
In [9]:
krdd.count()
Out[9]:
In [10]:
krdd.take(3)
Out[10]:
In [12]:
krdd_dup = sc.parallelize([("a", 1), ("a", 1)])
krdd_dup.distinct().collect()
Out[12]:
In [27]:
krdd.countByKey() # countByKey - number of times a key appears in the k-v rdd
Out[27]:
In [33]:
krdd.lookup("a") # lookup
Out[33]:
In [34]:
krdd.toDebugString() # toDebugString (identifies recursive dependencies of this rdd for debugging purposes)
Out[34]:
In [45]:
krdd.collectAsMap() # collectAsMap -> return key-value RDD as a dictionary
Out[45]:
Key: Reduce the number of data shuffles across the cluster.
Accumulator : It is a shared variable that can store metadata about the data, gather statistics about the data that can be used for logging, debugging, tracking or other purposes.
In [60]:
# default accumulator accumulates only numeric (int and float) types; only does 'add' operation (commutative and associative)
accum = sc.accumulator(0, accum_param = None)
def squareValues(x):
global accum
#accum += 1
accum.add(1)
return x**2
print rdd.map(squareValues).collect()
print "No. of elements: %d" % accum.value
In [ ]:
# custom accumulator to support any types
class CustomAccumulator(AccumulatorParam):
def zero(self, initialValue):
template = set()
template.add(initialValue)
return template
def addInPlace(self, template1, template2):
return template1.union(template2)
accum = sc.accumulator(None, accum_param = CustomAccumulator())
def squareValues(x):
global accum
accum += x
return x**2
print rdd.map(squareValues).collect()
print "No. of elements: %d" % accum.value
Broadcast variable: Another shared variable useful to disseminate small amounts of data to every executor.
In [72]:
bb = sc.broadcast({"a": 10, "b": 15})
print bb.value
bb.unpersist() # deletes cached copies from the executors
In [73]:
bb.value
Out[73]:
In [ ]: