Pre-Flight Check

To test the installation

Adopted from Paco's Pre-Flight Check


In [2]:
import datetime
from pytz import timezone
print "Last run @%s" % (datetime.datetime.now(timezone('US/Pacific')))
#
import sys
print "Python Version : %s" % (sys.version)
#
from pyspark.sql import SparkSession 
print "Spark Version  : %s" % (spark.version)

from pyspark.context import SparkContext
print "Spark Version  : %s" % (sc.version)
#
from pyspark.conf import SparkConf
conf = SparkConf()
print conf.toDebugString()


Last run @2016-08-14 16:50:16.256951-07:00
Python Version : 2.7.12 |Continuum Analytics, Inc.| (default, Jun 29 2016, 11:09:23) 
[GCC 4.2.1 (Based on Apple Inc. build 5658) (LLVM build 2336.11.00)]
Spark Version  : 2.0.0
Spark Version  : 2.0.0
spark.app.name=PySparkShell
spark.master=local[*]
spark.submit.deployMode=client

In [3]:
data = xrange(1,101)

In [4]:
data_rdd = sc.parallelize(data)

In [5]:
data_rdd.take(3)


Out[5]:
[1, 2, 3]

In [6]:
# Make sure rdd works
data_rdd.filter(lambda x: x < 10).collect()


Out[6]:
[1, 2, 3, 4, 5, 6, 7, 8, 9]

In [6]:
data_rdd.top(5)


Out[6]:
[100, 99, 98, 97, 96]

In [7]:
# Move on to dataFrames
df = data_rdd.map(lambda x:[x]).toDF(['SeqNo']) # needs each row as a list

In [8]:
df.show(10)


+-----+
|SeqNo|
+-----+
|    1|
|    2|
|    3|
|    4|
|    5|
|    6|
|    7|
|    8|
|    9|
|   10|
+-----+
only showing top 10 rows


In [9]:
df.filter(df.SeqNo <= 10).show()


+-----+
|SeqNo|
+-----+
|    1|
|    2|
|    3|
|    4|
|    5|
|    6|
|    7|
|    8|
|    9|
|   10|
+-----+


In [10]:
import pyspark.sql.functions as F
df.withColumn("Square",F.pow(df.SeqNo,2)).show(10) # Normal pow doesn't take columns


+-----+------+
|SeqNo|Square|
+-----+------+
|    1|   1.0|
|    2|   4.0|
|    3|   9.0|
|    4|  16.0|
|    5|  25.0|
|    6|  36.0|
|    7|  49.0|
|    8|  64.0|
|    9|  81.0|
|   10| 100.0|
+-----+------+
only showing top 10 rows


In [11]:
# Reduce vs fold
rdd_1 = sc.parallelize([])

In [12]:
rdd_1.reduce(lambda a,b : a+b)


---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-12-db543e4a676c> in <module>()
----> 1 rdd_1.reduce(lambda a,b : a+b)

/Users/ksankar/Downloads/spark-1.6.0/python/pyspark/rdd.pyc in reduce(self, f)
    798         if vals:
    799             return reduce(f, vals)
--> 800         raise ValueError("Can not reduce() empty RDD")
    801 
    802     def treeReduce(self, f, depth=2):

ValueError: Can not reduce() empty RDD

In [ ]:
rdd_1.take(10)

In [ ]:
rdd_1.fold(0,lambda a,b : a+b)

In [ ]:
rdd_2 = sc.parallelize([1,2])

In [ ]:
from operator import add
rdd_2.fold(0,add)

In [ ]:
rdd_x = sc.parallelize(['a','b','c'])
rdd_y = sc.parallelize([1,2,3])

In [ ]:
rdd_x.cartesian(rdd_y).take(20)

In [ ]: