In [1]:
!module list
In [2]:
!cypress-kinit
!klist
In [3]:
import sys
import os
sys.path.insert(0, '/usr/hdp/current/spark2-client/python')
sys.path.insert(0, '/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip')
os.environ['SPARK_HOME'] = '/usr/hdp/current/spark2-client/'
os.environ['SPARK_CONF_DIR'] = '/etc/hadoop/synced_conf/spark2/'
os.environ['PYSPARK_PYTHON'] = '/software/anaconda3/4.2.0/bin/python'
import pyspark
conf = pyspark.SparkConf()
conf.setMaster("yarn")
conf.set("spark.driver.memory","4g")
conf.set("spark.executor.memory","60g")
conf.set("spark.num.executors","3")
conf.set("spark.executor.cores","12")
sc = pyspark.SparkContext(conf=conf)
In [4]:
sc
Out[4]:
In [5]:
textFile = sc.textFile("/repository/gutenberg-shakespeare.txt")
In [6]:
print (textFile)
Storage Level:
In [7]:
textFile.getStorageLevel()
Out[7]:
In [8]:
textFile.getNumPartitions()
Out[8]:
In [9]:
textFile.cache()
Out[9]:
In [10]:
textFile.getStorageLevel()
Out[10]:
Data operations in Spark are categorized into two groups, transformation and action.
"All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program." -- Spark Documentation
Transformations:
Actions:
In [11]:
textFile = sc.textFile("/repository/gutenberg-shakespeare.txt")
In [12]:
textFile
Out[12]:
In [13]:
%%time
textFile.count()
Out[13]:
In [14]:
wordcount = textFile.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
In [15]:
wordcount
Out[15]:
In [16]:
!hdfs dfs -rm -r intro-to-spark
!hdfs dfs -mkdir intro-to-spark
In [17]:
wordcount.saveAsTextFile("intro-to-spark/output-wordcount-01")
In [18]:
!hdfs dfs -cat intro-to-spark/output-wordcount-01/part-00000 \
2>/dev/null | head -n 20
Step-by-step actions:
In [19]:
!hdfs dfs -cat /repository/gutenberg-shakespeare.txt \
2>/dev/null | head -n 100
In [20]:
wordcount_step_01 = textFile.flatMap(lambda line: line.split(" "))
In [21]:
wordcount_step_01
Out[21]:
In [22]:
wordcount_step_01.take(20)
Out[22]:
In [23]:
wordcount_step_02 = wordcount_step_01.map(lambda word: (word, 1))
In [24]:
wordcount_step_02.take(20)
Out[24]:
In [25]:
wordcount_step_03 = wordcount_step_02.reduceByKey(lambda a, b: a + b)
In [26]:
wordcount_step_03.take(20)
Out[26]:
In [ ]:
To stop the Spark job, call sc.stop()
In [27]:
sc.stop()