In [1]:
%pylab inline
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from future.builtins import (bytes, str, open, super, range,
zip, round, input, int, pow, object)
if sys.version_info.major == 2:
# in Python 2 cPickle is much faster than pickle but doesn't deal w/ unicode
import cPickle as pickle
else:
# Python 3 loads the faster pickle by default if it's available
import pickle
import collections
import random
import time
In [2]:
from utils import header
print(header.create_header(sc))
In [3]:
def is_ob(ob):
return "STN" not in ob
def is_station(station_id, ob):
return station_id in ob
def obs_by_station(obs_rdd):
""" Given an RDD of observations from GSOD returns a dictionary of the observations
the key is the station id and the value is an array of the observations for that
station.
"""
stations = obs_rdd.map(lambda line: (line.split()[0], 1))\
.reduceByKey(lambda x, y: x + y)\
.collect()
station_obs = {}
for station in stations:
station_id = str(station[0])
station_obs[station_id] = \
obs_rdd.filter(lambda line: is_station(station_id, line))\
.collect()
return station_obs
def obs_by_year(years):
""" Given a list of years returns an ordered dictionary of the years with a value of
a dictionary of station_ids with the value of observations.
"""
years_of_obs = collections.OrderedDict()
for year in years:
obs_rdd = sc.textFile("/user/schiefjm/weather/gsod/" + str(year))\
.filter(lambda line: is_ob(line))
obs_dict = collections.OrderedDict(obs_by_station(obs_rdd))
years_of_obs[year] = obs_dict
return years_of_obs
In [4]:
structured_obs = obs_by_year([year for year in range(1929, 1930)])
sys.getsizeof(structured_obs)
Out[4]:
In [5]:
persistent_stations = set()
years = [key for key in structured_obs]
for year in years:
for key in structured_obs[year]:
persistent_stations.add(key)
print(len(persistent_stations))
for station in sorted(persistent_stations):
# print(station)
pass
In [6]:
for year in range(1929, 1933):
stations = sc.textFile("/user/schiefjm/weather/gsod/" + str(year))\
.filter(lambda line: "STN" not in line)\
.map(lambda line: (line.split()[0], 1))\
.reduceByKey(lambda x, y: x + y)\
.collect()
print(year, len(set(stations)))