This example shows how the Pilot-Abstraction is used to spawn a Spark job inside of YARN. We show how to combine the Pilot and Spark programming modelling using several examples.
The following codes show how the Pilot-Abstraction is used to connect to an existing YARN cluster and start Spark.
In [26]:
%matplotlib inline
%run ../env.py
%run ../util/init_spark.py
from pilot_hadoop import PilotComputeService as PilotSparkComputeService
pilotcompute_description = {
"service_url": "yarn-client://yarn-aws.radical-cybertools.org",
"number_of_processes": 2
}
print "SPARK HOME: %s"%os.environ["SPARK_HOME"]
print "PYTHONPATH: %s"%os.environ["PYTHONPATH"]
pilot_spark = PilotSparkComputeService.create_pilot(pilotcompute_description=pilotcompute_description)
sc = pilot_spark.get_spark_context()
After the Spark application has been submitted it can be monitored via the YARN web interface: http://yarn-aws.radical-cybertools.org:8088/. The following commands prints out the Spark application currently running in YARN
In [27]:
output=!yarn application -list -appTypes Spark -appStates RUNNING
print_application_url(output)
Out[27]:
In [28]:
text_rdd = sc.textFile("/data/nasa/")
text_rdd.count()
Out[28]:
Word Count: How many words?
In [29]:
text_rdd.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda x,y: x+y).take(10)
Out[29]:
HTTP Response Code Count: How many HTTP errors did we observe?
In [18]:
text_rdd = sc.textFile("/data/nasa/")
text_rdd.filter(lambda x: len(x)>8).map(lambda x: (x.split()[-2],1)).reduceByKey(lambda x,y: x+y).collect()
Out[18]:
Compare the lines of code that were needed to perform the same functionality using MapReduce versus Spark. Which is more?
Dataframes are an abstraction that allows high-level reasoning on structured data. Data can easily be filtered, aggregated and combined using DataFrames. DataFrames can also be used for machine learning tasks.
In the following commands, we are transforming unstructured log data into a structured DataFrame consisting of three columns: referer, timestamp and response code. We then sample and view the data.
In [30]:
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
text_filtered = text_rdd.filter(lambda x: len(x)>8)
logs = text_filtered.top(20)
cleaned = text_filtered.map(lambda l: (l.split(" ")[0], l.split(" ")[3][1:], l.split(" ")[6], l.split(" ")[-2]))
rows = cleaned.map(lambda l: Row(referer=l[0], ts=l[1], response_code=l[3]))
schemaLog = sqlContext.createDataFrame(rows)
schemaLog.registerTempTable("row")
In [31]:
schemaLog.show()
In [32]:
df=sqlContext.sql("select response_code, count(*) as count from row group by response_code")
In [33]:
df.show()
Spark Dataframes interoperate with Pandas Dataframes. Small data can be further processed using Pandas and Python tools, e.g. Matplotlib and Bokeh for plotting.
In [34]:
pdf=df.toPandas()
In [35]:
%matplotlib inline
pdf['count']=pdf['count']/1000
pdf.plot(x='response_code', y='count', kind='barh')
Out[35]:
In [36]:
pilot_spark.cancel()