In [1]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("MinTemperatures")
sc = SparkContext(conf = conf)

In [2]:
def parseLine(line):
    fields = line.split(',')
    stationID = fields[0]
    entryType = fields[2]
    temperature = float(fields[3])*0.1
    return (stationID, entryType, temperature)

In [5]:
lines = sc.textFile("file:///opt/ipython/work/example2/1800.csv")
parsedLines = lines.map(parseLine)

In [6]:
parsedLines.take(5)


Out[6]:
[('ITE00100554', 'TMAX', -7.5),
 ('ITE00100554', 'TMIN', -14.8),
 ('GM000010962', 'PRCP', 0.0),
 ('EZE00100082', 'TMAX', -8.6),
 ('EZE00100082', 'TMIN', -13.5)]

In [7]:
minTemps = parsedLines.filter(lambda x: "TMIN" in x[1])
minTemps.take(5)


Out[7]:
[('ITE00100554', 'TMIN', -14.8),
 ('EZE00100082', 'TMIN', -13.5),
 ('ITE00100554', 'TMIN', -12.5),
 ('EZE00100082', 'TMIN', -13.0),
 ('ITE00100554', 'TMIN', -4.6000000000000005)]

In [8]:
stationTemps = minTemps.map(lambda x: (x[0], x[2]))
stationTemps.take(5)


Out[8]:
[('ITE00100554', -14.8),
 ('EZE00100082', -13.5),
 ('ITE00100554', -12.5),
 ('EZE00100082', -13.0),
 ('ITE00100554', -4.6000000000000005)]

In [9]:
minTemps = stationTemps.reduceByKey(lambda x, y: min(x,y))
results = minTemps.collect();

In [10]:
for result in results:
    print(result[0] + "\t{:.2f}C".format(result[1]))


ITE00100554	-14.80C
EZE00100082	-13.50C

In [ ]: