After submitting a job, we will need to retrieve the result. This can be stored in HDFS or elsewhere. Depending on the output size this can be a convenient approach or not. If so, we will need to write it in some format in order to read it back afterwards.
To produce this test I'm using Spark 1.5.1 (Pyspark 1.5.1) and spark-avro libraries loaded like this:
spark-submit --packages com.databricks:spark-avro_2.10:2.0.1 [...]
For Spark 1.3.0 use
spark-submit --packages com.databricks:spark-avro_2.10:1.0.0 [...]
Example with Spark 1.3.0 is provided in a separated file.
Index:
In [2]:
# is SparkContext already loaded?
sc
Out[2]:
In [3]:
# Make sure you have a HiveContext
sqlContext
Out[3]:
In [4]:
# Which is the version?
sc.version
Out[4]:
In [5]:
# load a dataframe from Avro files
df = sqlContext.read.format("com.databricks.spark.avro").load("/cms/wmarchive/test/avro/2016/01/01/")
In [14]:
df.printSchema()
In [7]:
%%time
df.count()
Out[7]:
In [9]:
aggregation1 = df.select("steps.performance.cpu") \
.rdd \
.flatMap(lambda cpuArrayRows: cpuArrayRows[0]) \
.map(lambda row: row.asDict()) \
.flatMap(lambda rowDict: [(k,v) for k,v in rowDict.iteritems()]) \
.reduceByKey(lambda x,y: x+y)
In [10]:
%%time
aggregation1.collect()
Out[10]:
In [ ]:
# Store the file as a simple text file
aggregation1.saveAsTextFile("wmarchive/test-plaintext-aggregation1")
In [11]:
%%bash
hadoop fs -text wmarchive/test-plaintext-aggregation1/*
In [ ]:
aggregated1DF = sqlContext.createDataFrame([{v[0]:v[1] for v in aggregation1.collect()}])
In [ ]:
# saving in Json format
aggregated1DF.toJSON().saveAsTextFile("wmarchive/test-json-aggregation1")
In [12]:
%%bash
hadoop fs -text wmarchive/test-json-aggregation1/*
In [ ]:
# how to write in Avro format
aggregated1DF.write.format("com.databricks.spark.avro").save("wmarchive/test-avro-aggregation1")
In [13]:
%%bash
hadoop fs -text wmarchive/test-avro-aggregation1/*
In [ ]: