In [1]:
# Python library imports: numpy, random, sklearn, pandas, etc

import warnings
warnings.filterwarnings('ignore')

import sys
import random
import numpy as np

from sklearn import linear_model, cross_validation, metrics, svm
from sklearn.linear_model import LogisticRegression
from sklearn.cross_validation import train_test_split
from sklearn import metrics
from sklearn.cross_validation import cross_val_score

from patsy import dmatrices
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
# function to read HDFS file into dataframe using PyDoop
import pydoop.hdfs as hdfs
def read_csv_from_hdfs( path ):
  pieces = []
  fhandle = hdfs.open(path)
  print "validating file : %s" % fhandle
  cols = ['key', 'value'];
  pieces.append(pd.read_csv(fhandle, names=cols, dtype=None, delimiter="\t"))
  fhandle.close()
  return pd.concat(pieces, ignore_index=True)

In [3]:
def extract_data_as_frame(in_data ):
    # dataset LONGITUDE  LATITUDE  T_DAILY_MEAN  SUR_TEMP_DAILY_AVG  SOIL_MOISTURE_10_DAILY
    data_list = []
    for index in data_val2:
        dict1 = {}
        x= float(index[3])  
        if x < -30:
            continue
        x= float(index[2])  
        if x < -30:
            continue
        dict1.update(lat=float(index[1]),lon=float(index[0]), day=float(index[2]), surface=float(index[3]), moisture=float(index[4])) 
        data_list.append(dict1)
    data_as_frame = pd.DataFrame(data_list, columns=['lat', 'lon', 'day', 'surface', 'moisture'])
    return data_as_frame

In [4]:
def extract_geo_data(in_data ):
    geo_list = []
    for index in data_val2:
        dict1 = {}
        dict1.update(lat=index[1],lon=index[0]) 
        geo_list.append(dict1)
    geo_key = pd.DataFrame(geo_list, columns=['lat', 'lon'])
    return geo_key

In [5]:
def extract_temp_data(in_data ):
    temp_list = []
    for index in data_val2:
        dict1 = {}
        dict1.update(day=index[2],surface=index[3]) 
        temp_list.append(dict1)
    temp_values = pd.DataFrame(temp_list, columns=['day', 'surface'])
    return temp_values

In [6]:
def extract_soil_mositure(in_data ):
    moisture_list = []
    for index in data_val2:
        dict1 = {}
        dict1.update(moisture=index[4] ) 
        moisture_list.append(dict1)
    moisture_values = pd.DataFrame(moisture_list, columns=['moisture'])
    return moisture_values

In [7]:
result = read_csv_from_hdfs('/user/cloudera/sci_data_1_out/part-r-00000')
data_val =  result.iloc[:,[1]]
# data_val2 will be a series so we convert it to useful dataframes 
# dataset LONGITUDE  LATITUDE  T_DAILY_MEAN  SUR_TEMP_DAILY_AVG  SOIL_MOISTURE_10_DAILY
data_val2 = data_val.value.str.split('|')
data_as_frame =extract_data_as_frame(data_val2 ) 

cols = ['lat', 'lon', 'day', 'surface', 'moisture']
dataNorth_y = data_as_frame[data_as_frame['lat'] > 23.8 ]
dataNorth_x = data_as_frame[cols]
y, X = dmatrices('moisture ~ lat + lon + day + surface', dataNorth_y, return_type="dataframe")
print X.columns
y = np.ravel(y)
model = LogisticRegression()
model = model.fit(X, y)
y.mean()
pd.DataFrame(zip(X.columns, np.transpose(model.coef_)))


validating file : <pydoop.hdfs.file.hdfs_file object at 0x45a4f90>
Index([u'Intercept', u'lat', u'lon', u'day', u'surface'], dtype='object')
Out[7]:
0 1
0 Intercept [-0.0280144477647, -0.053052304366, 0.01672769...
1 lat [-0.689967315305, -0.153928247361, 0.211091392...
2 lon [0.388184946552, 0.0697258788407, -0.153419980...
3 day [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
4 surface [0.165362569539, -0.0523609530014, -0.24406524...