Install pyspark (and make it run on jupyter-notebook and the anaconda python):

  • go to http://spark.apache.org/downloads.html and download the spark version you need (the final directory is your SPARK_HOME)
  • do python setup.py install inside the python directory
  • remember you should have scala and java installed I think
  • export SPARK_HOME="/home/ale/Documents/mylibraries/spark-2.1.0-bin-hadoop2.7"
    export PATH="\$SPARK_HOME/bin:\$PATH"
    export PYSPARK_DRIVER_PYTHON=jupyter
    export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
    export PYSPARK_PYTHON=/home/ale/anaconda3/bin/python
  • start pyspark (the exe is in the SPARK_HOME/bin directory) from a terminal and it will open a jupyter notebook with the python of anaconda

In [1]:
## check if spark is running (check cpu in htop can help too)
sc._jsc.sc().isStopped()

# To stop spark, refresh the jupyter or
# sc.stop()


Out[1]:
False

In [4]:
%%time
# calculate pi value in pyspark
import random
NUM_SAMPLES = 100000000
def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

count = sc.parallelize(range(0, NUM_SAMPLES)).filter(inside).count()
print("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))


Pi is roughly 3.141516
CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 14.5 s

In [5]:
%%time
# same as above in python only
j = 0
count = len([j for j in range(0, NUM_SAMPLES) if inside(j)])
print("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))


Pi is roughly 3.141539
CPU times: user 37.2 s, sys: 652 ms, total: 37.8 s
Wall time: 37.8 s

Spark Transformations:

  • map()
  • flatMap()
  • filter()
  • sample()
  • union()
  • intersection()
  • distinct()
  • join()

Spark Actions:

  • reduce()
  • collect()
  • count()
  • first()
  • takeSample(withReplacement, num, [seed])

In [6]:
confusedRDD = sc.textFile("confused.txt")

In [7]:
confusedRDD.take(4)


Out[7]:
['Confusion is the inability to think as clearly or quickly as you normally do.',
 '',
 'You may  have difficulty paying attention to anything , remembering anyone, and making decisions.',
 '']

In [8]:
# many of the function are the same as for the scala collections (map, flatmap, filter)
mapRDD = confusedRDD.map(lambda x: x.split(' '))
flatmapRDD = confusedRDD.flatMap(lambda x: x.split(' '))

In [9]:
mapRDD.take(3)


Out[9]:
[['Confusion',
  'is',
  'the',
  'inability',
  'to',
  'think',
  'as',
  'clearly',
  'or',
  'quickly',
  'as',
  'you',
  'normally',
  'do.'],
 [''],
 ['You',
  'may',
  '',
  'have',
  'difficulty',
  'paying',
  'attention',
  'to',
  'anything',
  ',',
  'remembering',
  'anyone,',
  'and',
  'making',
  'decisions.']]

In [10]:
flatmapRDD.take(3)


Out[10]:
['Confusion', 'is', 'the']

In [11]:
onlyconfusion = confusedRDD.filter(lambda x : ("confus" in x.lower()))
onlyconfusion.count()


Out[11]:
7

In [12]:
onlyconfusion.collect()


Out[12]:
['Confusion is the inability to think as clearly or quickly as you normally do.',
 'Confusion may come to anyone early or late phase of the life, depending on the reason behind it .',
 'Many times, confusion lasts for a very short span and goes away.',
 'Confusion is more common in people who are in late stages of the life and often occurs when you have stayed in hospital.',
 'Some confused people may have strange or unusual behavior or may act aggressively.',
 'A good way to find out if anyone is confused is to question the person their identity i.e. name, age, and the date.',
 'If they are little not sure or unable to answer correctly, they are confused']

In [13]:
#This transformation is used to pick sample RDD from a larger RDD. 
# It is frequently used in Machine learning operations where a sample of the dataset needs to be taken. 
# The fraction means percentage of the total data you want to take the sample from.
sampledconfusion = confusedRDD.sample(False,0.5,4)  # True implies withReplacement
sampledconfusion.collect()


Out[13]:
['',
 '',
 '',
 'Other times, it may be permanent and has no cure. It may have association with delirium or dementia.',
 'Confusion is more common in people who are in late stages of the life and often occurs when you have stayed in hospital.',
 '',
 'Some confused people may have strange or unusual behavior or may act aggressively.',
 'A good way to find out if anyone is confused is to question the person their identity i.e. name, age, and the date.',
 '']

In [14]:
# Union is basically used to merge two RDDs together if they have the same structure. 
# (here the data is a pair string-int)
abhay_marks = [("physics",85),("maths",75),("chemistry",95)]
ankur_marks = [("physics",65),("maths",45),("chemistry",85)]

abhay = sc.parallelize(abhay_marks)
ankur = sc.parallelize(ankur_marks)

abhay.union(ankur).collect()


Out[14]:
[('physics', 85),
 ('maths', 75),
 ('chemistry', 95),
 ('physics', 65),
 ('maths', 45),
 ('chemistry', 85)]

In [15]:
# This transformation joins two RDDs based on a common key.
Subject_wise_marks = abhay.join(ankur)
Subject_wise_marks.collect()


Out[15]:
[('physics', (85, 65)), ('maths', (75, 45)), ('chemistry', (95, 85))]

In [16]:
# Intersection gives you the common terms or objects from the two RDDS.
Cricket_team = ["sachin","abhay","michael","rahane","david","ross","raj","rahul","hussy","steven","sourav"]
Toppers = ["rahul","abhay","laxman","bill","steve"]

cricketRDD = sc.parallelize(Cricket_team)
toppersRDD = sc.parallelize(Toppers)

toppercricketers = toppersRDD.intersection(cricketRDD)
toppercricketers.collect()


Out[16]:
['rahul', 'abhay']

In [17]:
# This transformation is used to get rid of any ambiguities.
# As the name suggest it picks out the lines from the RDD that are unique.
best_story = ["movie1","movie3","movie7","movie5","movie8"]
best_direction = ["movie11","movie1","movie5","movie10","movie7"]
best_screenplay = ["movie10","movie4","movie6","movie7","movie3"]

story_rdd = sc.parallelize(best_story)
direction_rdd = sc.parallelize(best_direction)
screen_rdd = sc.parallelize(best_screenplay)

total_nomination_rdd = story_rdd.union(direction_rdd).union(screen_rdd)
print(total_nomination_rdd.collect())

unique_movies_rdd = total_nomination_rdd.distinct()
unique_movies_rdd .collect()


['movie1', 'movie3', 'movie7', 'movie5', 'movie8', 'movie11', 'movie1', 'movie5', 'movie10', 'movie7', 'movie10', 'movie4', 'movie6', 'movie7', 'movie3']
Out[17]:
['movie10',
 'movie8',
 'movie5',
 'movie1',
 'movie4',
 'movie7',
 'movie3',
 'movie11',
 'movie6']

Parallelism is the key feature of any distributed system where operations are done by dividing the data into multiple parallel partitions. The same operation is performed on the partitions simultaneously which helps achieve fast data processing with spark. Map and Reduce operations can be effectively applied in parallel in apache spark by dividing the data into multiple partitions. A copy of each partition within an RDD is distributed across several workers running on different nodes of a cluster so that in case of failure of a single worker the RDD still remains available.

Degree of parallelism of each operation on RDD depends on the fixed number of partitions that an RDD has. We can specify the degree of parallelism or the number of partitions when creating it or later on using the repartition () and coalesce() methods.

coalesce () is an optimized version of repartition() method that avoids data movement and is generally used to decrease the number of partitions after filtering a large dataset.


In [22]:
filename = "/home/ale/.steam/steam/steamapps/common/Pillars\ of\ Eternity/PillarsOfEternity_Data/data/conversations/10_od_nua/1002_cv_pet_wurm.conversation"
partRDD = sc.textFile(filename,4)

In [23]:
partRDD.getNumPartitions()


Out[23]:
4

When processing data with reduceByKey operation, Spark will form as many number of output partitions based on the default parallelism which depends on the numbers of nodes and cores available on each node.


In [ ]:
# This runs a map operation individually on each partition unlike a normal map operation 
# where map is used to operate on each line of the entire RDD.
partRDD.mapPartitions() 

# This works same as partRDD.mapPartitions but we can additionally 
# specify the partition number on which this operation has to be applied.
mapPartitionsWithIndex()

Accumulators in spark are the global variable that can be shared across tasks. The scope of normal variables is just limited to a specific task


In [ ]:

Example using the user of the moviedb


In [25]:
userRDD = sc.textFile("./ml-100k/u.user")

In [30]:
userRDD.take(10)


Out[30]:
['1|24|M|technician|85711',
 '2|53|F|other|94043',
 '3|23|M|writer|32067',
 '4|24|M|technician|43537',
 '5|33|F|other|15213',
 '6|42|M|executive|98101',
 '7|57|M|administrator|91344',
 '8|36|M|administrator|05201',
 '9|29|M|student|01002',
 '10|53|M|lawyer|90703']

In [32]:
# Transform the data
def parse_N_calculate_age(data):
    userid, age, gender, occupation, zip = data.split("|")
    return userid, age_group(int(age)), gender, occupation, zip, int(age)


def age_group(age):
    if age < 10:
        return '0-10'
    elif age < 20:
        return '10-20'
    elif age < 30:
        return '20-30'
    elif age < 40:
        return '30-40'
    elif age < 50:
        return '40-50'
    elif age < 60:
        return '50-60'
    elif age < 70:
        return '60-70'
    elif age < 80:
        return '70-80'
    else:
        return '80+'

In [33]:
data_with_age_bucket = userRDD.map(parse_N_calculate_age)
RDD_20_30 = data_with_age_bucket.filter(lambda line : '20-30' in line)

In [42]:
freq = RDD_20_30.map(lambda line : line[3]).countByValue()
dict(freq)


Out[42]:
{'administrator': 19,
 'artist': 12,
 'doctor': 2,
 'educator': 12,
 'engineer': 23,
 'entertainment': 8,
 'executive': 7,
 'healthcare': 4,
 'homemaker': 3,
 'lawyer': 4,
 'librarian': 11,
 'marketing': 5,
 'none': 2,
 'other': 38,
 'programmer': 30,
 'salesman': 2,
 'scientist': 8,
 'student': 116,
 'technician': 12,
 'writer': 14}

In [46]:
# Since we are done with the operations on the above cached data we can remove them from memory using unpersisit () 
RDD_20_30.unpersist()


Out[46]:
PythonRDD[63] at RDD at PythonRDD.scala:48

In [47]:
Under_age = sc.accumulator(0)
Over_age = sc.accumulator(0)

def outliers(data):
    global Over_age, Under_age
    age_grp = data[1]
    if(age_grp == "70-80"):
        Over_age +=1
    if(age_grp == "0-10"):
        Under_age +=1
    return data

In [48]:
df = data_with_age_bucket.map(outliers).collect()

In [55]:
Under_age


Out[55]:
Accumulator<id=0, value=1>

In [56]:
Over_age


Out[56]:
Accumulator<id=1, value=4>

To Running a Spark application in Standalone Mode

  • create a file .py where all the functions and commands are stored and then run:
  • spark-submit myfile.py

In [ ]:


In [ ]:


In [ ]: