In [1]:
from pyspark import SparkContext
sc = SparkContext()
In [2]:
int_RDD = sc.parallelize(range(10), 3)
int_RDD
Out[2]:
In [3]:
int_RDD.collect()
Out[3]:
In [4]:
int_RDD.glom().collect()
Out[4]:
In [5]:
text = sc.textFile('file:///home/vahid/Github/DataScience/bigdata-platforms/data/31987-0.txt')
## read the first line
text.take(1)
Out[5]:
In [6]:
text.take(3)
Out[6]:
In [7]:
example = sc.textFile('data/example.txt')
# print the first line to make sure it's working
print(example.take(1))
In [8]:
def lower(line):
return(line.lower())
# apply lower() to each element:
example.map(lower).take(1)
Out[8]:
In [9]:
def split(line):
return(line.split())
# apply split to each element, resulting in 0-more outputs --> flatMap
example.flatMap(split).take(5)
Out[9]:
In [10]:
def create_keyval(word):
return(word, 1)
# Create key-value pairs for each split element --> map
example.flatMap(split).map(create_keyval).take(5)
Out[10]:
In [11]:
def filterlen(word):
return(len(word)>5)
# filter split elements based on their character lengths
example.flatMap(split).filter(filterlen).collect()
Out[11]:
In [12]:
pairs_RDD = example.flatMap(split).map(create_keyval)
for key,vals in pairs_RDD.groupByKey().take(5):
print(key, list(vals))
In [13]:
def sumvals(a, b):
return (a + b)
pairs_RDD.reduceByKey(sumvals).take(10)
Out[13]:
In [17]:
het_RDD = sc.parallelize(
[['Alex', 23, 'CSE', 3.87],
['Bob', 24, 'ECE', 3.73],
['Max', 26, 'BCH', 3.44],
['Nikole', 25, 'CSE', 3.75],
['Jane', 22, 'ECE', 3.65],
['John', 22, 'BCH', 3.55]]
)
print(het_RDD.take(1))
het_RDD.collect()
Out[17]:
In [21]:
def extract_dept_grade(row):
return(row[2], row[3])
## apply extract_dept_grade function to each element
dept_grade_RDD = het_RDD.map(extract_dept_grade)
print(dept_grade_RDD.collect())
In [22]:
## find the max. for each dept
dept_grade_RDD.reduceByKey(max).collect()
Out[22]:
In [1]:
from pyspark import SparkContext
sc = SparkContext()
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
In [2]:
df = sqlContext.read.json('data/example-datafrae.json')
df.show()
In [3]:
df.printSchema()
In [4]:
df.select("name").show()
In [6]:
df.groupBy("major").count().show()
In [8]:
df.groupBy("major").mean("gpa").show()
In [10]:
df.groupBy("major").max("gpa").show()
Now, we need to add the SPARK_HOME location to the PATH environment variable
export SPARK_HOME=$HOME/apps/spark
export PATH=$SPARK_HOME/bin:$PATH
Now, you can launch pyspark by pyspark
By default, pyspark will generate lots of log messages when you run some command, and we can see how that can be a problem. To reduce the verbosity, copy the template file in the conf folder
cp $SPARK_HOME/conf/log4j.properties.template $SPARK_HOME/conf/log4j.properties
and edit it by replacing the INFO to WARN.
In order to use pyspark in an iPython notebook, you need to configure it by adding a new file in the startup directory of ipython profile.
vim $HOME/.ipython/profile_default/startup/00-pyspark-setup.py
and add these contents in this file:
In [ ]:
import os
import sys
# Configure the environment
if 'SPARK_HOME' not in os.environ:
home_folder = os.environ['HOME']
os.environ['SPARK_HOME'] = os.path.join(home_folder, 'apps/spark')
# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']
# Add the PySpark/py4j to the Python Path
sys.path.insert(0, os.path.join(SPARK_HOME, "python", "build"))
sys.path.insert(0, os.path.join(SPARK_HOME, "python"))
Now, you should be able to run ipython and use pyspark. Try running the following commands:
In [1]:
print(SPARK_HOME)
In [2]:
from pyspark import SparkContext
sc = SparkContext( 'local', 'pyspark')
sc.parallelize(range(10), 3)
Out[2]:
In case you received an error for py4j, then you also need to run these two commands in bash shell
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH