Karung | By Andrew Conti

Lets take a look at our data:


In [15]:
import csv
with open("sample.txt", 'rb') as f1:
    reader = csv.reader(f1)
    for line in reader:
        print line


['0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999']
['0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999']
['0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999']
['0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999']
['0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999']

In [35]:
!cat sample.txt


0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999
0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999
0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999
0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999
0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999

Let's define the 'map' part our our mapreduce function:


In [31]:
%%writefile max_temperature_map.py

#!/usr/bin/env python
class max_temp_map(Mapreduce):
	'''
	A simple doc string
	'''
	import re
	import sys

	for line in sys.stdin:
	  val = line.strip()
	  (year, temp, q) = (val[15:19], val[87:92], val[92:93])
	  if (temp != "+9999" and re.match("[01459]", q)):
	    print "%s\t%s" % (year, temp)


Writing max_temperature_map.py

Let's define the reduce:


In [32]:
%%writefile max_temperature_reduce.py
#!/usr/bin/env python
class max_temp_reduce(Mapred):
	'''

	'''
	import sys

	(last_key, max_val) = (None, -sys.maxint)
	for line in sys.stdin:
	  (key, val) = line.strip().split("\t")
	  if last_key and last_key != key:
	    print "%s\t%s" % (last_key, max_val)
	    (last_key, max_val) = (key, int(val))
	  else:
	    (last_key, max_val) = (key, max(max_val, int(val)))

	if last_key:
	  print "%s\t%s" % (last_key, max_val)


Writing max_temperature_reduce.py

Let's test out our functions and see the results:

Simple test


In [38]:
!cat sample.txt | max_temperature_map.py | \sort | max_temperature_reduce.py


'\sort' is not recognized as an internal or external command,
operable program or batch file.

hadoop test with combiner function (for effiency)


In [ ]:
%%cmd
hadoop jar HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input sample.txt \
-output output \
-mapper "max_temperature_map.py | sort | max_temperature_reduce.py" \
-reducer max_temperature_reduce.py

Results:


In [ ]:
Great but that process was a little time consuming

Lets try this out in Pig

Below is a pig file to find the max temp, just like we did before.


In [ ]:
%%writefile max_temp.pig
-- max_temp.pig: Finds the maximum temperature by year
records = LOAD 'input/ncdc/micro-tab/sample.txt'
  AS (year:chararray, temperature:int, quality:int);
filtered_records = FILTER records BY temperature != 9999 AND
  (quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9);
grouped_records = GROUP filtered_records BY year;
max_temp = FOREACH grouped_records GENERATE group,
  MAX(filtered_records.temperature);
DUMP max_temp;

Starts pig in local mode:


In [ ]:
!pig -x local

Tests out or script:


In [ ]:
!pig max_temp.pig

Results:

Version Control


In [23]:
cd C:\\Users\\Andrew\\Documents\\Hadoop\\Karung/


C:\Users\Andrew\Documents\Hadoop\Karung

In [26]:
!git add Karung.ipynb

In [27]:
!git commit


[master a76538e] created basic structure of presenation, added pig file
 1 file changed, 118 insertions(+), 46 deletions(-)

In [ ]: