In [18]:
import findspark
findspark.init()

import sys
from pyspark.sql import SparkSession, functions, types

spark = SparkSession.builder.appName('weather ETL').getOrCreate()

assert sys.version_info >= (3, 4) # make sure we have Python 3.4+
assert spark.version >= '2.1' # make sure we have Spark 2.1+

observation_schema = types.StructType([
    types.StructField('station', types.StringType(), False),
    types.StructField('date', types.StringType(), False),
    types.StructField('observation', types.StringType(), False),
    types.StructField('value', types.IntegerType(), False),
    types.StructField('mflag', types.StringType(), False),
    types.StructField('qflag', types.StringType(), False),
    types.StructField('sflag', types.StringType(), False),
    types.StructField('obstime', types.StringType(), False),
])


def main():
    in_directory = sys.argv[1]
    out_directory = sys.argv[2]
    in_directory = "weather-1"
    out_directory = "output"
    weather = spark.read.csv(in_directory, schema=observation_schema)
    filtered_weather = weather.where(
        (weather.qflag.isNull()) &
        (weather.station.startswith("CA")) &
        (weather.observation == 'TMAX')
    )
    cleaned_data = filtered_weather.select(
        filtered_weather.station, 
        filtered_weather.date,
        (filtered_weather.value / 10).alias("tmax")
    )
#     k = weather.where(

    cleaned_data.show()
    # TODO: finish here.
#     weather.show()
    cleaned_data.write.json(out_directory, compression='gzip', mode='overwrite')


if __name__=='__main__':
    main()


+-----------+--------+----+
|    station|    date|tmax|
+-----------+--------+----+
|CA004016322|20161203|-0.5|
|CA001096629|20161203| 1.7|
|CA008403399|20161203|-0.2|
|CA006016529|20161203|-5.7|
|CA001175122|20161203| 2.0|
|CA003075601|20161203| 2.1|
|CA001035614|20161203| 7.5|
|CA001160515|20161203| 3.6|
|CA003051R4R|20161203| 3.8|
|CA006010740|20161203|-5.7|
|CA003014195|20161203| 3.1|
|CA007093714|20161203|-9.1|
|CA00401HP5R|20161203| 1.1|
|CA00707DBD4|20161203|-0.4|
|CA008205774|20161203| 4.0|
|CA003076069|20161203|-0.9|
|CA001018935|20161203|10.0|
|CA008204909|20161203| 4.1|
|CA004010879|20161203|-1.2|
|CA001184791|20161203| 2.4|
+-----------+--------+----+
only showing top 20 rows