In [ ]:
### Import Packages
import pandas as pd
import numpy as np
import elasticsearch
import re
import json
from datetime import datetime
from pprint import pprint
import timeit
# Define elasticsearch class
es = elasticsearch.Elasticsearch()
In [ ]:
### Helper Functions
# convert np.int64 into int. json.dumps does not work with int64
class SetEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.int64):
return np.int(obj)
# else
return json.JSONEncoder.default(self, obj)
# Convert datestamp into ISO format
def str_to_iso(text):
if text != '':
for fmt in ('%Y-%m-%d %H:%M:%S.%f', '%Y-%m-%d %H:%M:%S', '%Y-%m-%d'):
try:
return datetime.isoformat(datetime.strptime(text, fmt))
except ValueError:
pass
raise ValueError('no valid date format found')
else:
return None
# Custom groupby function
def concatdf(x):
if len(x) > 1: #if multiple values
return list(x)
else: #if single value
return x.iloc[0]
In [ ]:
### Import Data
# Load projects, resources & donations data
projects = pd.read_csv('./data/opendata_projects.csv', index_col=None, escapechar='\\')
donations = pd.read_csv('./data/opendata_donations.csv', index_col=None, escapechar='\\')
resources = pd.read_csv('./data/opendata_resources.csv', index_col=None, escapechar='\\' )
In [ ]:
### Data Cleanup
# replace nan with ''
projects = projects.fillna('')
donations = donations.fillna('')
resources = resources.fillna('')
# Clean up column names: remove _ at the start of column name
donations.columns = donations.columns.map(lambda x: re.sub('^ ', '', x))
donations.columns = donations.columns.map(lambda x: re.sub('^_', '', x))
projects.columns = projects.columns.map(lambda x: re.sub('^_', '', x))
resources.columns = resources.columns.map(lambda x: re.sub('^ ', '', x))
resources.columns = resources.columns.map(lambda x: re.sub('^_', '', x))
# Add quotes around projectid values to match format in projects / donations column
resources['projectid'] = resources['projectid'].map(lambda x: '"' + x +'"')
# Add resource_prefix to column names
resources.rename(columns={'vendorid': 'resource_vendorid', 'vendor_name': 'resource_vendor_name', 'item_name': 'resource_item_name',
'item_number' :'resource_item_number', "item_unit_price": 'resource_item_unit_price',
'item_quantity': 'resource_item_quantity'}, inplace=True)
In [ ]:
### Merge multiple resource row per projectid into a single row
# NOTE: section may take a few minutes to execute
concat_resource = pd.DataFrame()
gb = resources.groupby('projectid')
start = timeit.timeit()
for a in resources.columns.values:
print(a)
concat_resource[a] = gb[a].apply(lambda x: concatdf(x))
#print(xx.index)
end = timeit.timeit()
print(end - start)
concat_resource['projectid'] = concat_resource.index;
concat_resource.reset_index(drop=True);
In [ ]:
### Rename Project columns
projects.rename(columns=lambda x: "project_" + x, inplace=True)
projects.rename(columns={"project_projectid": "projectid"}, inplace=True)
projects.columns.values
In [ ]:
#### Merge data into single frame
data = pd.merge(projects, concat_resource, how='left', right_on='projectid', left_on='projectid')
data = pd.merge(donations, data, how='left', right_on='projectid', left_on='projectid')
data = data.fillna('')
In [ ]:
#### Process columns
# Modify date formats
data['project_date_expiration'] = data['project_date_expiration'].map(lambda x: str_to_iso(x));
data['project_date_posted'] = data['project_date_posted'].map(lambda x: str_to_iso(x))
data['project_date_thank_you_packet_mailed'] = data['project_date_thank_you_packet_mailed'].map(lambda x: str_to_iso(x))
data['project_date_completed'] = data['project_date_completed'].map(lambda x: str_to_iso(x))
data['donation_timestamp'] = data['donation_timestamp'].map(lambda x: str_to_iso(x))
# Create location field that combines lat/lon information
data['project_location'] = data[['project_school_longitude','project_school_latitude']].values.tolist()
del(data['project_school_latitude']) # delete latitude field
del(data['project_school_longitude']) # delete longitude
In [ ]:
### Create and configure Elasticsearch index
# Name of index and document type
index_name = 'donorschoose';
doc_name = 'donation'
# Delete donorschoose index if one does exist
if es.indices.exists(index_name):
es.indices.delete(index_name)
# Create donorschoose index
es.indices.create(index_name)
# Add mapping
with open('donorschoose_mapping.json') as json_mapping:
d = json.load(json_mapping)
es.indices.put_mapping(index=index_name, doc_type=doc_name, body=d)
In [ ]:
### Index Data into Elasticsearch
for don_id, thisDonation in data.iterrows():
# print every 10000 iteration
if don_id % 10000 == 0:
print(don_id)
thisDoc = json.dumps(thisDonation.to_dict(), cls=SetEncoder);
# write to elasticsearch
es.index(index=index_name, doc_type=doc_name, id=thisDonation['donationid'], body=thisDoc)
In [ ]: