In [18]:
import findspark
findspark.init()
import sys
from pyspark.sql import SparkSession, functions, types
spark = SparkSession.builder.appName('weather ETL').getOrCreate()
assert sys.version_info >= (3, 4) # make sure we have Python 3.4+
assert spark.version >= '2.1' # make sure we have Spark 2.1+
observation_schema = types.StructType([
types.StructField('station', types.StringType(), False),
types.StructField('date', types.StringType(), False),
types.StructField('observation', types.StringType(), False),
types.StructField('value', types.IntegerType(), False),
types.StructField('mflag', types.StringType(), False),
types.StructField('qflag', types.StringType(), False),
types.StructField('sflag', types.StringType(), False),
types.StructField('obstime', types.StringType(), False),
])
def main():
in_directory = sys.argv[1]
out_directory = sys.argv[2]
in_directory = "weather-1"
out_directory = "output"
weather = spark.read.csv(in_directory, schema=observation_schema)
filtered_weather = weather.where(
(weather.qflag.isNull()) &
(weather.station.startswith("CA")) &
(weather.observation == 'TMAX')
)
cleaned_data = filtered_weather.select(
filtered_weather.station,
filtered_weather.date,
(filtered_weather.value / 10).alias("tmax")
)
# k = weather.where(
cleaned_data.show()
# TODO: finish here.
# weather.show()
cleaned_data.write.json(out_directory, compression='gzip', mode='overwrite')
if __name__=='__main__':
main()