In [15]:
import csv
with open("sample.txt", 'rb') as f1:
reader = csv.reader(f1)
for line in reader:
print line
In [35]:
!cat sample.txt
In [31]:
%%writefile max_temperature_map.py
#!/usr/bin/env python
class max_temp_map(Mapreduce):
'''
A simple doc string
'''
import re
import sys
for line in sys.stdin:
val = line.strip()
(year, temp, q) = (val[15:19], val[87:92], val[92:93])
if (temp != "+9999" and re.match("[01459]", q)):
print "%s\t%s" % (year, temp)
In [32]:
%%writefile max_temperature_reduce.py
#!/usr/bin/env python
class max_temp_reduce(Mapred):
'''
'''
import sys
(last_key, max_val) = (None, -sys.maxint)
for line in sys.stdin:
(key, val) = line.strip().split("\t")
if last_key and last_key != key:
print "%s\t%s" % (last_key, max_val)
(last_key, max_val) = (key, int(val))
else:
(last_key, max_val) = (key, max(max_val, int(val)))
if last_key:
print "%s\t%s" % (last_key, max_val)
Simple test
In [38]:
!cat sample.txt | max_temperature_map.py | \sort | max_temperature_reduce.py
hadoop test with combiner function (for effiency)
In [ ]:
%%cmd
hadoop jar HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input sample.txt \
-output output \
-mapper "max_temperature_map.py | sort | max_temperature_reduce.py" \
-reducer max_temperature_reduce.py
In [ ]:
Great but that process was a little time consuming
Lets try this out in Pig
In [ ]:
%%writefile max_temp.pig
-- max_temp.pig: Finds the maximum temperature by year
records = LOAD 'input/ncdc/micro-tab/sample.txt'
AS (year:chararray, temperature:int, quality:int);
filtered_records = FILTER records BY temperature != 9999 AND
(quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9);
grouped_records = GROUP filtered_records BY year;
max_temp = FOREACH grouped_records GENERATE group,
MAX(filtered_records.temperature);
DUMP max_temp;
In [ ]:
!pig -x local
In [ ]:
!pig max_temp.pig
In [23]:
cd C:\\Users\\Andrew\\Documents\\Hadoop\\Karung/
In [26]:
!git add Karung.ipynb
In [27]:
!git commit
In [ ]: