How to write results into HDFS

After submitting a job, we will need to retrieve the result. This can be stored in HDFS or elsewhere. Depending on the output size this can be a convenient approach or not. If so, we will need to write it in some format in order to read it back afterwards.

To produce this test I'm using Spark 1.5.1 (Pyspark 1.5.1) and spark-avro libraries loaded like this:

spark-submit --packages com.databricks:spark-avro_2.10:2.0.1 [...]

For Spark 1.3.0 use

spark-submit --packages com.databricks:spark-avro_2.10:1.0.0 [...]

Example with Spark 1.3.0 is provided in a separated file.

Index:


In [2]:
# is SparkContext already loaded?
sc


Out[2]:
<pyspark.context.SparkContext at 0x7f10d8032650>

In [3]:
# Make sure you have a HiveContext
sqlContext


Out[3]:
<pyspark.sql.context.HiveContext at 0x7f10c96dbe50>

In [4]:
# Which is the version?
sc.version


Out[4]:
u'1.5.1'

In [5]:
# load a dataframe from Avro files
df = sqlContext.read.format("com.databricks.spark.avro").load("/cms/wmarchive/test/avro/2016/01/01/")

In [14]:
df.printSchema()


root
 |-- PFNArrayRef: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- task: string (nullable = false)
 |-- skippedFiles: array (nullable = false)
 |    |-- element: long (containsNull = false)
 |-- wmaid: string (nullable = false)
 |-- wmats: double (nullable = false)
 |-- fallbackFiles: array (nullable = false)
 |    |-- element: long (containsNull = false)
 |-- LFNArray: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- meta_data: struct (nullable = false)
 |    |-- agent_ver: string (nullable = false)
 |    |-- fwjr_id: string (nullable = false)
 |    |-- host: string (nullable = false)
 |    |-- ts: long (nullable = false)
 |-- steps: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- status: long (nullable = false)
 |    |    |-- errors: array (nullable = false)
 |    |    |    |-- element: struct (containsNull = false)
 |    |    |    |    |-- type: string (nullable = false)
 |    |    |    |    |-- details: string (nullable = false)
 |    |    |    |    |-- exitCode: long (nullable = false)
 |    |    |-- name: string (nullable = false)
 |    |    |-- output: array (nullable = false)
 |    |    |    |-- element: struct (containsNull = false)
 |    |    |    |    |-- branch_hash: string (nullable = false)
 |    |    |    |    |-- guid: string (nullable = false)
 |    |    |    |    |-- size: long (nullable = false)
 |    |    |    |    |-- applicationName: string (nullable = false)
 |    |    |    |    |-- acquisitionEra: string (nullable = false)
 |    |    |    |    |-- applicationVersion: string (nullable = false)
 |    |    |    |    |-- inputPFNs: array (nullable = false)
 |    |    |    |    |    |-- element: long (containsNull = false)
 |    |    |    |    |-- configURL: string (nullable = false)
 |    |    |    |    |-- outputDataset: string (nullable = false)
 |    |    |    |    |-- location: string (nullable = false)
 |    |    |    |    |-- inputLFNs: array (nullable = false)
 |    |    |    |    |    |-- element: long (containsNull = false)
 |    |    |    |    |-- async_dest: string (nullable = false)
 |    |    |    |    |-- events: long (nullable = false)
 |    |    |    |    |-- merged: long (nullable = false)
 |    |    |    |    |-- validStatus: string (nullable = false)
 |    |    |    |    |-- adler32: string (nullable = false)
 |    |    |    |    |-- ouput_module_class: string (nullable = false)
 |    |    |    |    |-- globalTag: string (nullable = false)
 |    |    |    |    |-- catalog: string (nullable = false)
 |    |    |    |    |-- module_label: string (nullable = false)
 |    |    |    |    |-- cksum: string (nullable = false)
 |    |    |    |    |-- StageOutCommand: string (nullable = false)
 |    |    |    |    |-- outputPFNs: array (nullable = false)
 |    |    |    |    |    |-- element: long (containsNull = false)
 |    |    |    |    |-- inputDataset: string (nullable = false)
 |    |    |    |    |-- runs: array (nullable = false)
 |    |    |    |    |    |-- element: struct (containsNull = false)
 |    |    |    |    |    |    |-- runNumber: long (nullable = false)
 |    |    |    |    |    |    |-- lumis: array (nullable = false)
 |    |    |    |    |    |    |    |-- element: long (containsNull = false)
 |    |    |    |    |-- outputLFNs: array (nullable = false)
 |    |    |    |    |    |-- element: long (containsNull = false)
 |    |    |    |    |-- processingVer: long (nullable = false)
 |    |    |    |    |-- processingStr: string (nullable = false)
 |    |    |    |    |-- prep_id: string (nullable = false)
 |    |    |-- stop: long (nullable = false)
 |    |    |-- site: string (nullable = false)
 |    |    |-- start: long (nullable = false)
 |    |    |-- input: array (nullable = false)
 |    |    |    |-- element: struct (containsNull = false)
 |    |    |    |    |-- runs: array (nullable = false)
 |    |    |    |    |    |-- element: struct (containsNull = false)
 |    |    |    |    |    |    |-- runNumber: long (nullable = false)
 |    |    |    |    |    |    |-- lumis: array (nullable = false)
 |    |    |    |    |    |    |    |-- element: long (containsNull = false)
 |    |    |    |    |-- input_source_class: string (nullable = false)
 |    |    |    |    |-- input_type: string (nullable = false)
 |    |    |    |    |-- lfn: long (nullable = false)
 |    |    |    |    |-- pfn: long (nullable = false)
 |    |    |    |    |-- catalog: string (nullable = false)
 |    |    |    |    |-- module_label: string (nullable = false)
 |    |    |    |    |-- guid: string (nullable = false)
 |    |    |    |    |-- events: long (nullable = false)
 |    |    |-- performance: struct (nullable = false)
 |    |    |    |-- storage: struct (nullable = false)
 |    |    |    |    |-- writeTotalMB: double (nullable = false)
 |    |    |    |    |-- readPercentageOps: double (nullable = false)
 |    |    |    |    |-- readAveragekB: double (nullable = false)
 |    |    |    |    |-- readTotalMB: double (nullable = false)
 |    |    |    |    |-- readNumOps: double (nullable = false)
 |    |    |    |    |-- readCachePercentageOps: double (nullable = false)
 |    |    |    |    |-- readMBSec: double (nullable = false)
 |    |    |    |    |-- readMaxMSec: double (nullable = false)
 |    |    |    |    |-- readTotalSecs: double (nullable = false)
 |    |    |    |    |-- writeTotalSecs: double (nullable = false)
 |    |    |    |-- cpu: struct (nullable = false)
 |    |    |    |    |-- TotalJobCPU: double (nullable = false)
 |    |    |    |    |-- AvgEventCPU: double (nullable = false)
 |    |    |    |    |-- MaxEventCPU: double (nullable = false)
 |    |    |    |    |-- AvgEventTime: double (nullable = false)
 |    |    |    |    |-- MinEventCPU: double (nullable = false)
 |    |    |    |    |-- TotalEventCPU: long (nullable = false)
 |    |    |    |    |-- TotalJobTime: double (nullable = false)
 |    |    |    |    |-- MinEventTime: double (nullable = false)
 |    |    |    |    |-- MaxEventTime: double (nullable = false)
 |    |    |    |-- memory: struct (nullable = false)
 |    |    |    |    |-- PeakValueRss: double (nullable = false)
 |    |    |    |    |-- PeakValueVsize: double (nullable = false)
 |-- PFNArray: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- LFNArrayRef: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- stype: string (nullable = false)


In [7]:
%%time
df.count()


CPU times: user 5 ms, sys: 3 ms, total: 8 ms
Wall time: 14.8 s
Out[7]:
200000

Aggregation examples

First example

1) Aggregated sum of all steps.performance.cpu values. In this case the result is a single line that can be easily stored back in HDFS, also in a textual format.


In [9]:
aggregation1 = df.select("steps.performance.cpu") \
    .rdd \
    .flatMap(lambda cpuArrayRows: cpuArrayRows[0]) \
    .map(lambda row: row.asDict()) \
    .flatMap(lambda rowDict: [(k,v) for k,v in rowDict.iteritems()]) \
    .reduceByKey(lambda x,y: x+y)

In [10]:
%%time
aggregation1.collect()


CPU times: user 15 ms, sys: 6 ms, total: 21 ms
Wall time: 19.6 s
Out[10]:
[('TotalJobCPU', 152097347.4204235),
 ('TotalJobTime', 1652265.155986641),
 ('MinEventCPU', 1349112.0),
 ('MinEventTime', 300451.93912530516),
 ('AvgEventCPU', 299587.125072929),
 ('MaxEventTime', 1656177.8017510779),
 ('TotalEventCPU', 151599144),
 ('AvgEventTime', 300340.72760207055),
 ('MaxEventCPU', 1351100.0)]

In [ ]:
# Store the file as a simple text file
aggregation1.saveAsTextFile("wmarchive/test-plaintext-aggregation1")

In [11]:
%%bash
hadoop fs -text wmarchive/test-plaintext-aggregation1/*


('TotalJobCPU', 152097347.4204235)
('TotalJobTime', 1652265.155986641)
('MinEventCPU', 1349112.0)
('MinEventTime', 300451.93912530516)
('AvgEventCPU', 299587.125072929)
('MaxEventTime', 1656177.8017510779)
('TotalEventCPU', 151599144)
('AvgEventTime', 300340.72760207055)
('MaxEventCPU', 1351100.0)

In [ ]:
aggregated1DF = sqlContext.createDataFrame([{v[0]:v[1] for v in aggregation1.collect()}])

In [ ]:
# saving in Json format
aggregated1DF.toJSON().saveAsTextFile("wmarchive/test-json-aggregation1")

In [12]:
%%bash
hadoop fs -text wmarchive/test-json-aggregation1/*


{"AvgEventCPU":299587.125072929,"AvgEventTime":300340.72760207055,"MaxEventCPU":1351100.0,"MaxEventTime":1656177.8017510779,"MinEventCPU":1349112.0,"MinEventTime":300451.93912530516,"TotalEventCPU":151599144,"TotalJobCPU":1.520973474204235E8,"TotalJobTime":1652265.155986641}

In [ ]:
# how to write in Avro format
aggregated1DF.write.format("com.databricks.spark.avro").save("wmarchive/test-avro-aggregation1")

In [13]:
%%bash
hadoop fs -text wmarchive/test-avro-aggregation1/*


{"AvgEventCPU":{"double":299587.125072929},"AvgEventTime":{"double":300340.72760207055},"MaxEventCPU":{"double":1351100.0},"MaxEventTime":{"double":1656177.8017510779},"MinEventCPU":{"double":1349112.0},"MinEventTime":{"double":300451.93912530516},"TotalEventCPU":{"long":151599144},"TotalJobCPU":{"double":1.520973474204235E8},"TotalJobTime":{"double":1652265.155986641}}

Second example

2) Count which is the most popular host


In [ ]: