In [1]:
## check if spark is running (check cpu in htop can help too)
sc._jsc.sc().isStopped()
# To stop spark, refresh the jupyter or
# sc.stop()
Out[1]:
In [4]:
%%time
# calculate pi value in pyspark
import random
NUM_SAMPLES = 100000000
def inside(p):
x, y = random.random(), random.random()
return x*x + y*y < 1
count = sc.parallelize(range(0, NUM_SAMPLES)).filter(inside).count()
print("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))
In [5]:
%%time
# same as above in python only
j = 0
count = len([j for j in range(0, NUM_SAMPLES) if inside(j)])
print("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))
In [6]:
confusedRDD = sc.textFile("confused.txt")
In [7]:
confusedRDD.take(4)
Out[7]:
In [8]:
# many of the function are the same as for the scala collections (map, flatmap, filter)
mapRDD = confusedRDD.map(lambda x: x.split(' '))
flatmapRDD = confusedRDD.flatMap(lambda x: x.split(' '))
In [9]:
mapRDD.take(3)
Out[9]:
In [10]:
flatmapRDD.take(3)
Out[10]:
In [11]:
onlyconfusion = confusedRDD.filter(lambda x : ("confus" in x.lower()))
onlyconfusion.count()
Out[11]:
In [12]:
onlyconfusion.collect()
Out[12]:
In [13]:
#This transformation is used to pick sample RDD from a larger RDD.
# It is frequently used in Machine learning operations where a sample of the dataset needs to be taken.
# The fraction means percentage of the total data you want to take the sample from.
sampledconfusion = confusedRDD.sample(False,0.5,4) # True implies withReplacement
sampledconfusion.collect()
Out[13]:
In [14]:
# Union is basically used to merge two RDDs together if they have the same structure.
# (here the data is a pair string-int)
abhay_marks = [("physics",85),("maths",75),("chemistry",95)]
ankur_marks = [("physics",65),("maths",45),("chemistry",85)]
abhay = sc.parallelize(abhay_marks)
ankur = sc.parallelize(ankur_marks)
abhay.union(ankur).collect()
Out[14]:
In [15]:
# This transformation joins two RDDs based on a common key.
Subject_wise_marks = abhay.join(ankur)
Subject_wise_marks.collect()
Out[15]:
In [16]:
# Intersection gives you the common terms or objects from the two RDDS.
Cricket_team = ["sachin","abhay","michael","rahane","david","ross","raj","rahul","hussy","steven","sourav"]
Toppers = ["rahul","abhay","laxman","bill","steve"]
cricketRDD = sc.parallelize(Cricket_team)
toppersRDD = sc.parallelize(Toppers)
toppercricketers = toppersRDD.intersection(cricketRDD)
toppercricketers.collect()
Out[16]:
In [17]:
# This transformation is used to get rid of any ambiguities.
# As the name suggest it picks out the lines from the RDD that are unique.
best_story = ["movie1","movie3","movie7","movie5","movie8"]
best_direction = ["movie11","movie1","movie5","movie10","movie7"]
best_screenplay = ["movie10","movie4","movie6","movie7","movie3"]
story_rdd = sc.parallelize(best_story)
direction_rdd = sc.parallelize(best_direction)
screen_rdd = sc.parallelize(best_screenplay)
total_nomination_rdd = story_rdd.union(direction_rdd).union(screen_rdd)
print(total_nomination_rdd.collect())
unique_movies_rdd = total_nomination_rdd.distinct()
unique_movies_rdd .collect()
Out[17]:
Parallelism is the key feature of any distributed system where operations are done by dividing the data into multiple parallel partitions. The same operation is performed on the partitions simultaneously which helps achieve fast data processing with spark. Map and Reduce operations can be effectively applied in parallel in apache spark by dividing the data into multiple partitions. A copy of each partition within an RDD is distributed across several workers running on different nodes of a cluster so that in case of failure of a single worker the RDD still remains available.
Degree of parallelism of each operation on RDD depends on the fixed number of partitions that an RDD has. We can specify the degree of parallelism or the number of partitions when creating it or later on using the repartition () and coalesce() methods.
coalesce () is an optimized version of repartition() method that avoids data movement and is generally used to decrease the number of partitions after filtering a large dataset.
In [22]:
filename = "/home/ale/.steam/steam/steamapps/common/Pillars\ of\ Eternity/PillarsOfEternity_Data/data/conversations/10_od_nua/1002_cv_pet_wurm.conversation"
partRDD = sc.textFile(filename,4)
In [23]:
partRDD.getNumPartitions()
Out[23]:
When processing data with reduceByKey operation, Spark will form as many number of output partitions based on the default parallelism which depends on the numbers of nodes and cores available on each node.
In [ ]:
# This runs a map operation individually on each partition unlike a normal map operation
# where map is used to operate on each line of the entire RDD.
partRDD.mapPartitions()
# This works same as partRDD.mapPartitions but we can additionally
# specify the partition number on which this operation has to be applied.
mapPartitionsWithIndex()
Accumulators in spark are the global variable that can be shared across tasks. The scope of normal variables is just limited to a specific task
In [ ]:
In [25]:
userRDD = sc.textFile("./ml-100k/u.user")
In [30]:
userRDD.take(10)
Out[30]:
In [32]:
# Transform the data
def parse_N_calculate_age(data):
userid, age, gender, occupation, zip = data.split("|")
return userid, age_group(int(age)), gender, occupation, zip, int(age)
def age_group(age):
if age < 10:
return '0-10'
elif age < 20:
return '10-20'
elif age < 30:
return '20-30'
elif age < 40:
return '30-40'
elif age < 50:
return '40-50'
elif age < 60:
return '50-60'
elif age < 70:
return '60-70'
elif age < 80:
return '70-80'
else:
return '80+'
In [33]:
data_with_age_bucket = userRDD.map(parse_N_calculate_age)
RDD_20_30 = data_with_age_bucket.filter(lambda line : '20-30' in line)
In [42]:
freq = RDD_20_30.map(lambda line : line[3]).countByValue()
dict(freq)
Out[42]:
In [46]:
# Since we are done with the operations on the above cached data we can remove them from memory using unpersisit ()
RDD_20_30.unpersist()
Out[46]:
In [47]:
Under_age = sc.accumulator(0)
Over_age = sc.accumulator(0)
def outliers(data):
global Over_age, Under_age
age_grp = data[1]
if(age_grp == "70-80"):
Over_age +=1
if(age_grp == "0-10"):
Under_age +=1
return data
In [48]:
df = data_with_age_bucket.map(outliers).collect()
In [55]:
Under_age
Out[55]:
In [56]:
Over_age
Out[56]:
To Running a Spark application in Standalone Mode
In [ ]:
In [ ]:
In [ ]: