In [ ]:
import pandas as pd
import elasticsearch
import json
import re

es = elasticsearch.Elasticsearch()

In this example, we use the Google geocoding API to translate addresses into geo-coordinates. Google imposes usages limits on the API. If you are using this script to index data, you many need to sign up for an API key to overcome limits.

In [ ]:
from geopy.geocoders import GoogleV3
geolocator = GoogleV3()
# geolocator = GoogleV3(api_key=<your_google_api_key>)

Import Data

Import restaurant inspection data into a Pandas dataframe

In [ ]:
t = pd.read_csv('', header=0, sep=',', dtype={'PHONE':str, 'INSPECTION DATE':str});

In [ ]:
## Helper Functions
from datetime import datetime
def str_to_iso(text):
    if text != '':
        for fmt in (['%m/%d/%Y']):
                #print(datetime.strptime(text, fmt))
                return datetime.isoformat(datetime.strptime(text, fmt))
            except ValueError:
                #raise ValueError('Changing date')
        return None
def getLatLon(row):
    if row['Address'] != '':
        location = geolocator.geocode(row['Address'], timeout=10000, sensor=False)
        if location != None:
            lat = location.latitude
            lon = location.longitude
            return [lon, lat]
    elif row['Zipcode'] !='' or location != None:    
        location = geolocator.geocode(row['Zipcode'], timeout=10000, sensor=False)
        if location != None:
            lat = location.latitude
            lon = location.longitude
            return [lon, lat]
        return None
def getAddress(row):
    if row['Building'] != '' and row['Street'] != '' and row['Boro'] != '':
        x = row['Building']+' '+row['Street']+' '+row['Boro']+',NY'
        x = re.sub(' +',' ',x)
        return x
        return ''

def combineCT(x):
    return str(x['Inspection_Date'][0][0:10])+'_'+str(x['Camis'])

Data preprocessing

In [ ]:
# process column names: remove spaces & use title casing 
t.columns = map(str.title, t.columns)  
t.columns = map(lambda x: x.replace(' ', '_'), t.columns) 

# replace nan with ''
t.fillna('', inplace=True)

# Convert date to ISO format
t['Inspection_Date'] = t['Inspection_Date'].map(lambda x: str_to_iso(x))
t['Record_Date'] = t['Record_Date'].map(lambda x: str_to_iso(x))
t['Grade_Date'] = t['Grade_Date'].map(lambda x: str_to_iso(x))
#t['Inspection_Date'] = t['Inspection_Date'].map(lambda x: x.split('/'))

# Combine Street, Building and Boro information to create Address string
t['Address'] = t.apply(getAddress, axis=1)

Create a dictionary of unique Addresses. We do this to avoid calling the Google geocoding api multiple times for the same address

In [ ]:
addDict = t[['Address','Zipcode']].copy(deep=True)
addDict = addDict.drop_duplicates()
addDict['Coord'] = [None]* len(addDict)

Get address for the geolocation for each address. This step can take a while because it's calling the Google geocoding API for each unique address.

In [ ]:
for item_id, item in addDict.iterrows():
    if item_id % 100 == 0:
    if addDict['Coord'][item_id] == None:
        addDict['Coord'][item_id] = getLatLon(item)
# Save address dictionary to CSV

In [ ]:
# Merge coordinates into original table
t1 = t.merge(addDict[['Address', 'Coord']])

# Keep only 1 value of score and grade per inspection 
t2['raw_num'] = t2.index
t2['RI'] = t2.apply(combineCT, axis=1)
yy = t2.groupby('RI').first().reset_index()['raw_num']

t2['Unique_Score'] = None
t2['Unique_Score'].loc[yy.values] = t2['Score'].loc[yy.values]
t2['Unique_Grade'] = None
t2['Unique_Grade'].loc[yy.values] = t2['Grade'].loc[yy.values]


t2.rename(columns={'Unique_Grade' : 'Grade','Unique_Score':'Score'}, inplace=True)
t2['Grade'].fillna('', inplace=True)

In [ ]:

Index Data

In [ ]:
### Create and configure Elasticsearch index

# Name of index and document type
index_name = 'nyc_restaurants';
doc_name = 'inspection'

# Delete donorschoose index if one does exist
if es.indices.exists(index_name):

# Create donorschoose index

In [ ]:
# Add mapping
with open('./inspection_mapping.json') as json_mapping:
    d = json.load(json_mapping)

es.indices.put_mapping(index=index_name, doc_type=doc_name, body=d)

# Index data
for item_id, item in t2.iterrows():
    if item_id % 1000 == 0:
    thisItem = item.to_dict()
    #thisItem['Coord'] = getLatLon(thisItem)
    thisDoc = json.dumps(thisItem);

    # write to elasticsearch
    es.index(index=index_name, doc_type=doc_name, id=item_id, body=thisDoc)

In [ ]: