In [1]:
    
!module list
    
    
In [2]:
    
!cypress-kinit
!klist
    
    
In [3]:
    
import sys
import os
sys.path.insert(0, '/usr/hdp/current/spark2-client/python')
sys.path.insert(0, '/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip')
os.environ['SPARK_HOME'] = '/usr/hdp/current/spark2-client/'
os.environ['SPARK_CONF_DIR'] = '/etc/hadoop/synced_conf/spark2/'
os.environ['PYSPARK_PYTHON'] = '/software/anaconda3/4.2.0/bin/python'
import pyspark
conf = pyspark.SparkConf()
conf.setMaster("yarn")
conf.set("spark.driver.memory","4g")
conf.set("spark.executor.memory","60g")
conf.set("spark.num.executors","3")
conf.set("spark.executor.cores","12")
sc = pyspark.SparkContext(conf=conf)
    
In [4]:
    
sc
    
    Out[4]:
In [5]:
    
textFile = sc.textFile("/repository/gutenberg-shakespeare.txt")
    
In [6]:
    
print (textFile)
    
    
Storage Level:
In [7]:
    
textFile.getStorageLevel()
    
    Out[7]:
In [8]:
    
textFile.getNumPartitions()
    
    Out[8]:
In [9]:
    
textFile.cache()
    
    Out[9]:
In [10]:
    
textFile.getStorageLevel()
    
    Out[10]:
Data operations in Spark are categorized into two groups, transformation and action.
"All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program." -- Spark Documentation
Transformations:
Actions:
In [11]:
    
textFile = sc.textFile("/repository/gutenberg-shakespeare.txt")
    
In [12]:
    
textFile
    
    Out[12]:
In [13]:
    
%%time
textFile.count()
    
    
    Out[13]:
In [14]:
    
wordcount = textFile.flatMap(lambda line: line.split(" ")) \
            .map(lambda word: (word, 1)) \
            .reduceByKey(lambda a, b: a + b)
    
In [15]:
    
wordcount
    
    Out[15]:
In [16]:
    
!hdfs dfs -rm -r intro-to-spark
!hdfs dfs -mkdir intro-to-spark
    
    
In [17]:
    
wordcount.saveAsTextFile("intro-to-spark/output-wordcount-01")
    
In [18]:
    
!hdfs dfs -cat intro-to-spark/output-wordcount-01/part-00000 \
    2>/dev/null | head -n 20
    
    
Step-by-step actions:
In [19]:
    
!hdfs dfs -cat /repository/gutenberg-shakespeare.txt \
    2>/dev/null | head -n 100
    
    
In [20]:
    
wordcount_step_01 = textFile.flatMap(lambda line: line.split(" "))
    
In [21]:
    
wordcount_step_01
    
    Out[21]:
In [22]:
    
wordcount_step_01.take(20)
    
    Out[22]:
In [23]:
    
wordcount_step_02 = wordcount_step_01.map(lambda word: (word, 1))
    
In [24]:
    
wordcount_step_02.take(20)
    
    Out[24]:
In [25]:
    
wordcount_step_03 = wordcount_step_02.reduceByKey(lambda a, b: a + b)
    
In [26]:
    
wordcount_step_03.take(20)
    
    Out[26]:
In [ ]:
    
    
To stop the Spark job, call sc.stop()
In [27]:
    
sc.stop()