In [ ]:
import os
import sys

spark_home = '/opt/spark'
os.environ['SPARK_HOME'] = spark_home

sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.1-src.zip'))

execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))

In [ ]:
example_rdd = sc.parallelize(['hello','spark','at','spjain'])

In [ ]:
example_rdd

In [ ]:
example_rdd.count()

In [ ]:
example_rdd.first()

In [ ]:
example_rdd.take(2)

In [ ]:
example_rdd.takeSample(False, 3)

In [ ]:
example_rdd.collect()

In [ ]:
example_rdd.saveAsTextFile("examplerdd2.txt")

In [ ]:
birdstrikes = sc.textFile("file:///home/bigdata/training/datasets/birdstrikes.csv")

Excercise: count the number of lines in this file. take the first 5 lines. Take 10 sample rows (without repetition). Take the first row. Collect the whole dataset.


In [ ]:
birdstrikes.collect()

Partitioning


In [ ]:
birdstrikes.getNumPartitions()

In [ ]:
birdstrikes.repartition(20)

In [ ]:
birdstrikes.getNumPartitions()

Uh oh. (Immutability)


In [ ]:
rdd = birdstrikes.repartition(10)

In [ ]:
rdd.getNumPartitions()

In [ ]:
b = rdd.coalesce(5)

In [ ]:
rdd.getNumPartitions()

lambda functions


In [ ]:
def is_not_header(row):
    if row.startswith("id,"):
        return False
    else:
        return True

In [ ]:
rows = b.filter(is_not_header)

In [ ]:
rows.take(5)

In [ ]:
b.filter(is_not_header).take(5)

In [ ]:
b.filter(lambda row: not row.startswith("id,") ).take(5)

In [ ]:
rows.sortBy(lambda x: x).take(5)

In [ ]:
rows.sortBy(lambda x: int(x.split(',')[0])).take(5)

In [ ]:


In [ ]:


In [ ]:
rows.take(5)

In [ ]:
Grep

In [ ]:
mystring = "2,Airplane,2000-01-01,No damage,CONTINENTAL AIRLINES,New Jersey,"

In [ ]:
"AIRLINES" in mystring

In [ ]:
def has_airlines(mystr):
        if "AIRLINES" in mystr:
            return True
        else:
            return False

In [ ]:
has_airlines("2,Airplain, AIRLINES, something")

Excercise: how many accidents happened in California?


In [ ]:
rows.filter(lambda row: "California" in row).count()

Excercise: Count rows, which are in "California" and caused by "Small" birds

Excercise: show 5 rows which are in "California" and caused by "Small" birds

Select Fields


In [ ]:
states = rows.map(lambda x: x.split(",")[5])
states.take(5)

Excercise: Select 5 airlines


In [ ]:
table = rows.map(lambda x: x.split(","))
table.take(2)

In [ ]:
twocolumns = table.map(lambda row: [ row[5], row[8] ])
twocolumns.take(5)

In [ ]:
Excercise: Select 5 airlines where the bird size = 'Medium'

In [ ]:
table.filter(lambda x: x[8] == 'Medium').map(lambda x: x[5]).take(5)

In [ ]:
8table.filter(lambda x: x[8] == 'Medium').map(lambda x: x[4]).take(5)

Cache


In [ ]:
mediums = table.filter(lambda x: x[8] == 'Medium')

In [ ]:
mediums.cache()

Distinct


In [ ]:
table.map(lambda x: x[5]).take(20)

In [ ]:
table.map(lambda x: x[5]).distinct().collect()

Reduce


In [ ]:
table = table.filter(lambda row: '' not in row)

In [ ]:
table.map(lambda x: int(x[10])).reduce(lambda x, y: x + y)

PairRDDs - grouping


In [ ]:
sizes = table.map(lambda x: (x[8],int(x[10])))

In [ ]:
sizes.take(5)

In [ ]:
sizes.countByKey()

In [ ]:
sizes.reduceByKey(lambda x, y: x + y).collect()

Excercise: Count the sum for each aircraft type


In [ ]:
df = sizes.toDF(['size','cost'])

In [ ]:
df.take(2)

In [ ]:
df.toPandas()

In [ ]:
df.registerTempTable("costs")

In [ ]:
result_df = sqlContext.sql("SELECT * FROM costs LIMIT 5")

In [ ]:
result_df.show()

In [ ]:
sqlContext.sql("SELECT size, SUM(cost) FROM costs GROUP BY size").show()

In [ ]: