In [ ]:
import os
import sys
spark_home = '/opt/spark'
os.environ['SPARK_HOME'] = spark_home
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.1-src.zip'))
execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))
Check http://localhost:4040/
In [ ]:
example_rdd = sc.parallelize(['hello','spark','at','spjain'])
In [ ]:
example_rdd
In [ ]:
example_rdd.count()
In [ ]:
example_rdd.first()
In [ ]:
example_rdd.take(2)
In [ ]:
example_rdd.takeSample(False, 3)
In [ ]:
example_rdd.collect()
In [ ]:
example_rdd.saveAsTextFile("examplerdd2.txt")
In [ ]:
birdstrikes = sc.textFile("file:///home/bigdata/training/datasets/birdstrikes.csv")
Excercise: count the number of lines in this file. take the first 5 lines. Take 10 sample rows (without repetition). Take the first row. Collect the whole dataset.
In [ ]:
birdstrikes.collect()
Partitioning
In [ ]:
birdstrikes.getNumPartitions()
In [ ]:
birdstrikes.repartition(20)
In [ ]:
birdstrikes.getNumPartitions()
Uh oh. (Immutability)
In [ ]:
rdd = birdstrikes.repartition(10)
In [ ]:
rdd.getNumPartitions()
In [ ]:
b = rdd.coalesce(5)
In [ ]:
rdd.getNumPartitions()
lambda functions
In [ ]:
def is_not_header(row):
if row.startswith("id,"):
return False
else:
return True
In [ ]:
rows = b.filter(is_not_header)
In [ ]:
rows.take(5)
In [ ]:
b.filter(is_not_header).take(5)
In [ ]:
b.filter(lambda row: not row.startswith("id,") ).take(5)
In [ ]:
rows.sortBy(lambda x: x).take(5)
In [ ]:
rows.sortBy(lambda x: int(x.split(',')[0])).take(5)
In [ ]:
In [ ]:
In [ ]:
rows.take(5)
In [ ]:
Grep
In [ ]:
mystring = "2,Airplane,2000-01-01,No damage,CONTINENTAL AIRLINES,New Jersey,"
In [ ]:
"AIRLINES" in mystring
In [ ]:
def has_airlines(mystr):
if "AIRLINES" in mystr:
return True
else:
return False
In [ ]:
has_airlines("2,Airplain, AIRLINES, something")
Excercise: how many accidents happened in California?
In [ ]:
rows.filter(lambda row: "California" in row).count()
Excercise: Count rows, which are in "California" and caused by "Small" birds
Excercise: show 5 rows which are in "California" and caused by "Small" birds
Select Fields
In [ ]:
states = rows.map(lambda x: x.split(",")[5])
states.take(5)
Excercise: Select 5 airlines
In [ ]:
table = rows.map(lambda x: x.split(","))
table.take(2)
In [ ]:
twocolumns = table.map(lambda row: [ row[5], row[8] ])
twocolumns.take(5)
In [ ]:
Excercise: Select 5 airlines where the bird size = 'Medium'
In [ ]:
table.filter(lambda x: x[8] == 'Medium').map(lambda x: x[5]).take(5)
In [ ]:
8table.filter(lambda x: x[8] == 'Medium').map(lambda x: x[4]).take(5)
Cache
In [ ]:
mediums = table.filter(lambda x: x[8] == 'Medium')
In [ ]:
mediums.cache()
Distinct
In [ ]:
table.map(lambda x: x[5]).take(20)
In [ ]:
table.map(lambda x: x[5]).distinct().collect()
Reduce
In [ ]:
table = table.filter(lambda row: '' not in row)
In [ ]:
table.map(lambda x: int(x[10])).reduce(lambda x, y: x + y)
PairRDDs - grouping
In [ ]:
sizes = table.map(lambda x: (x[8],int(x[10])))
In [ ]:
sizes.take(5)
In [ ]:
sizes.countByKey()
In [ ]:
sizes.reduceByKey(lambda x, y: x + y).collect()
Excercise: Count the sum for each aircraft type
In [ ]:
df = sizes.toDF(['size','cost'])
In [ ]:
df.take(2)
In [ ]:
df.toPandas()
In [ ]:
df.registerTempTable("costs")
In [ ]:
result_df = sqlContext.sql("SELECT * FROM costs LIMIT 5")
In [ ]:
result_df.show()
In [ ]:
sqlContext.sql("SELECT size, SUM(cost) FROM costs GROUP BY size").show()
In [ ]: