PySpark for Data Analysis

Different ways of creating RDD

  • parallelize
  • read data from file
  • apply transformation to some existing RDDs

Basic Operations


In [1]:
from pyspark import SparkContext 
sc = SparkContext()

In [2]:
int_RDD = sc.parallelize(range(10), 3)

int_RDD


Out[2]:
PythonRDD[1] at RDD at PythonRDD.scala:43

In [3]:
int_RDD.collect()


Out[3]:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [4]:
int_RDD.glom().collect()


Out[4]:
[[0, 1, 2], [3, 4, 5], [6, 7, 8, 9]]

Reading data from a text file

To read data from a local file, you need to specify the address by file://

   textFile("file:///home/vahid/examplefile.txt")

But if the file is on HDFS, then we can specify the address by

    textFile("/user/wordcount/input/examplefile.txt")

In [5]:
text = sc.textFile('file:///home/vahid/Github/DataScience/bigdata-platforms/data/31987-0.txt')

## read the first line
text.take(1)


Out[5]:
['The Project Gutenberg EBook of Territory in Bird Life, by H. Eliot Howard']

Take the first k elements (lines)

text.take(k)

In [6]:
text.take(3)


Out[6]:
['The Project Gutenberg EBook of Territory in Bird Life, by H. Eliot Howard',
 '',
 'This eBook is for the use of anyone anywhere at no cost and with']

Narrow Transformation

  • map: applies a function to each element of RDD.

  • flatMap: similar to map, except that here we can have 0 or more outputs for each element

  • filter: apply a boolean function to each element of RDD, resulting in filtering out based on that function


In [7]:
example = sc.textFile('data/example.txt')

# print the first line to make sure it's working
print(example.take(1))


['Love looks not with the eyes, but with the mind; and therefore is winged Cupid painted blind.']

In [8]:
def lower(line):
    return(line.lower())

# apply lower() to each element:
example.map(lower).take(1)


Out[8]:
['love looks not with the eyes, but with the mind; and therefore is winged cupid painted blind.']

In [9]:
def split(line):
    return(line.split())

# apply split to each element, resulting in 0-more outputs --> flatMap
example.flatMap(split).take(5)


Out[9]:
['Love', 'looks', 'not', 'with', 'the']

In [10]:
def create_keyval(word):
    return(word, 1)

# Create key-value pairs for each split element --> map
example.flatMap(split).map(create_keyval).take(5)


Out[10]:
[('Love', 1), ('looks', 1), ('not', 1), ('with', 1), ('the', 1)]

In [11]:
def filterlen(word):
    return(len(word)>5)

# filter split elements based on their character lengths
example.flatMap(split).filter(filterlen).collect()


Out[11]:
['therefore',
 'winged',
 'painted',
 'blind.',
 'dreams',
 'little',
 'rounded',
 'sleep.']

Wide Transformation

  • groupByKey:
  • reduceByKey:
  • repartition

In [12]:
pairs_RDD = example.flatMap(split).map(create_keyval)

for key,vals in pairs_RDD.groupByKey().take(5):
    print(key, list(vals))


on, [1]
life, [1]
blind. [1]
none. [1]
painted [1]

In [13]:
def sumvals(a, b):
    return (a + b)

pairs_RDD.reduceByKey(sumvals).take(10)


Out[13]:
[('on,', 1),
 ('life,', 1),
 ('blind.', 1),
 ('none.', 1),
 ('painted', 1),
 ('rounded', 1),
 ('is', 2),
 ('but', 1),
 ('Love', 2),
 ('do', 1)]

Heterogenous Data Types


In [17]:
het_RDD = sc.parallelize(
    [['Alex', 23, 'CSE', 3.87],
     ['Bob',  24, 'ECE', 3.73],
     ['Max',  26, 'BCH', 3.44],
     ['Nikole', 25, 'CSE', 3.75],
     ['Jane', 22, 'ECE', 3.65],
     ['John', 22, 'BCH', 3.55]]
)

print(het_RDD.take(1))

het_RDD.collect()


[['Alex', 23, 'CSE', 3.87]]
Out[17]:
[['Alex', 23, 'CSE', 3.87],
 ['Bob', 24, 'ECE', 3.73],
 ['Max', 26, 'BCH', 3.44],
 ['Nikole', 25, 'CSE', 3.75],
 ['Jane', 22, 'ECE', 3.65],
 ['John', 22, 'BCH', 3.55]]

Find the max. grade for each department


In [21]:
def extract_dept_grade(row):
    return(row[2], row[3])

## apply extract_dept_grade function to each element
dept_grade_RDD = het_RDD.map(extract_dept_grade)
print(dept_grade_RDD.collect())


[('CSE', 3.87), ('ECE', 3.73), ('BCH', 3.44), ('CSE', 3.75), ('ECE', 3.65), ('BCH', 3.55)]

In [22]:
## find the max. for each dept
dept_grade_RDD.reduceByKey(max).collect()


Out[22]:
[('BCH', 3.55), ('CSE', 3.87), ('ECE', 3.73)]

Dataframes in Pyspark


In [1]:
from pyspark import SparkContext 
sc = SparkContext()

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [2]:
df = sqlContext.read.json('data/example-datafrae.json')

df.show()


+----+----+-----+--------+
| age| gpa|major|    name|
+----+----+-----+--------+
|  22|3.88|  CSE|    Andy|
|null|3.18| null|    Mike|
|null|3.58|  ECE|     Bob|
|  22|3.45|   ME|    John|
|  23| 3.8|  ECE|    Alex|
|  23|3.64|  CSE| Jenifer|
|  22|3.58|  CSE|Nicholas|
+----+----+-----+--------+


In [3]:
df.printSchema()


root
 |-- age: long (nullable = true)
 |-- gpa: double (nullable = true)
 |-- major: string (nullable = true)
 |-- name: string (nullable = true)


In [4]:
df.select("name").show()


+--------+
|    name|
+--------+
|    Andy|
|    Mike|
|     Bob|
|    John|
|    Alex|
| Jenifer|
|Nicholas|
+--------+


In [6]:
df.groupBy("major").count().show()


+-----+-----+
|major|count|
+-----+-----+
|  ECE|    2|
| null|    1|
|   ME|    1|
|  CSE|    3|
+-----+-----+


In [8]:
df.groupBy("major").mean("gpa").show()


+-----+------------------+
|major|          avg(gpa)|
+-----+------------------+
|  ECE|              3.69|
| null|              3.18|
|   ME|              3.45|
|  CSE|3.7000000000000006|
+-----+------------------+


In [10]:
df.groupBy("major").max("gpa").show()


+-----+--------+
|major|max(gpa)|
+-----+--------+
|  ECE|     3.8|
| null|    3.18|
|   ME|    3.45|
|  CSE|    3.88|
+-----+--------+

Appendix: Installing Spark on Ubuntu

Dowload and extract the spark package from

tar xvfz spark-1.5.2-bin-hadoop2.6.tgz
sudo mv spark-1.5.2-bin-hadoop2.6 $HOME/apps/spark/
cd $HOME/apps/spark/

Now, we need to add the SPARK_HOME location to the PATH environment variable

export SPARK_HOME=$HOME/apps/spark 
export PATH=$SPARK_HOME/bin:$PATH

Now, you can launch pyspark by pyspark

Reduce the verbosity level

By default, pyspark will generate lots of log messages when you run some command, and we can see how that can be a problem. To reduce the verbosity, copy the template file in the conf folder

cp $SPARK_HOME/conf/log4j.properties.template $SPARK_HOME/conf/log4j.properties

and edit it by replacing the INFO to WARN.

Using pyspark in iPython

In order to use pyspark in an iPython notebook, you need to configure it by adding a new file in the startup directory of ipython profile.

vim $HOME/.ipython/profile_default/startup/00-pyspark-setup.py

and add these contents in this file:


In [ ]:
import os
import sys

# Configure the environment
if 'SPARK_HOME' not in os.environ:
    home_folder = os.environ['HOME']
    os.environ['SPARK_HOME'] = os.path.join(home_folder, 'apps/spark')

# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']

# Add the PySpark/py4j to the Python Path
sys.path.insert(0, os.path.join(SPARK_HOME, "python", "build"))
sys.path.insert(0, os.path.join(SPARK_HOME, "python"))

Now, you should be able to run ipython and use pyspark. Try running the following commands:


In [1]:
print(SPARK_HOME)


/home/vahid/apps/spark

In [2]:
from pyspark import SparkContext

sc = SparkContext( 'local', 'pyspark')

sc.parallelize(range(10), 3)


Out[2]:
PythonRDD[1] at RDD at PythonRDD.scala:43

In case you received an error for py4j, then you also need to run these two commands in bash shell

export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH