In [1]:
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime
import json
import os
import numpy as np
import re
import requests
from distutils.util import strtobool
from dateutil import parser
In [2]:
def init_record_parsing(tdlr_all_licenses):
""" Initializes a dictionary that contains the following parameters:
- Number of TDLR record fields to skip
- TDLR record field names
Args:
tdlr_all_licenses: Nested dictionary that stores TDLR records
Return: parsing_params: Dictionary that contains parameters required to
parse a TDLR record
Key: Value:
--- ------
numskippedfields Number of fields to skip
fieldname List that contains formatted
TDLR record field names """
fieldname = []
for elem in tdlr_all_licenses['meta']['view']['columns']:
fieldname.append(elem['fieldName'])
num_skipped_fields = np.argwhere([re.match('^:.*', elem) != None
for elem in fieldname])[-1][0] + 1
fieldname = [re.sub('[_\s]', '', elem) for elem in fieldname]
fieldname = [re.sub('mmddccyy', '', elem) for elem in fieldname]
sub_column_types = {}
for idx in np.arange(num_skipped_fields,
len(tdlr_all_licenses['meta']['view']['columns'])):
if 'subColumnTypes' in tdlr_all_licenses['meta']['view']['columns'][idx].keys():
sub_column_types[fieldname[idx]] =\
tdlr_all_licenses['meta']['view']['columns'][idx]['subColumnTypes']
sub_column_types[fieldname[idx]] = [re.sub('[\s_]', '', elem)
for elem in sub_column_types[fieldname[idx]]]
fieldname = fieldname[num_skipped_fields:]
return {'numskippedfields': num_skipped_fields,
'fieldname': fieldname,
'subcolumntypes': sub_column_types}
def init_index_mapping():
""" Initializes a nested dictionary that defines the mapping of an
Elasticsearch index that contains TDLR records.
Args:
None
Returns:
mapping: Nested dictionary that defines the mapping of an
Elasticsearch index that contains TDLR records."""
mapping =\
{
"mappings": {
"tdlr": {
"properties": {
"licensetype": {
"type": "string",
"index": "not_analyzed"
},
"licensenumber": {
"type": "string",
"index": "not_analyzed"
},
"businesscounty": {
"type": "string",
"index": "not_analyzed"
},
"businessname": {"type": "string"},
"businessaddressline1": {"type": "string"},
"businessaddressline2": {"type": "string"},
"businesscity": {
"type": "string",
"index": "not_analyzed"
},
"businessstate": {
"type": "string",
"index": "not_analyzed"
},
"businesszip": {
"type": "string",
"index": "not_analyzed"
},
"businesstelephone": {
"type": "string",
"index": "not_analyzed"
},
"licenseexpirationdate": {
"type": "date",
"format": "date_optional_time"
},
"ownername": {"type": "string"},
"mailingaddressline1": {"type": "string"},
"mailingaddressline2": {"type": "string"},
"mailingaddresscity": {
"type": "string",
"index": "not_analyzed"
},
"mailingaddressstate": {
"type": "string",
"index": "not_analyzed"
},
"mailingaddresszip": {
"type": "string",
"index": "not_analyzed"
},
"mailingaddresscountycode": {"type": "integer"},
"mailingaddresscounty": {
"type": "string",
"index": "not_analyzed"
},
"ownertelephone": {
"type": "string",
"index": "not_analyzed"
},
"licensesubtype":{
"type": "string",
"index": "not_analyzed"
},
"continuingeducationflag": {"type": "boolean"},
"mailingaddresslocation": {"type": "geo_point"}
}
}
}
}
return mapping
def parse_int(string_value):
""" Parses an integer number
Args:
string_value: String that contains an integer
Returns:
int_value: Integer number"""
try:
int_value = np.int(string_value)
except TypeError:
int_value = None
return int_value
def parse_float(string_value):
""" Parses a floating point number
Args:
string_value: String that contains a floating point number
Returns:
float_value: Floating point number"""
try:
float_value = np.float(string_value)
except TypeError:
float_value = None
return float_value
def parse_boolean(string_value):
""" Parses a boolean value
Args:
string_value: String that contains an boolean value
Returns:
boolean_value: Boolean value"""
try:
boolean_value = np.bool(strtobool(string_value))
except TypeError:
boolean_value = False
except AttributeError:
boolean_value = False
return boolean_value
def parse_citystatezip(citystatezip):
""" Parses a string that stores a city, state, & zip code.
Args:
citystatezip: String that stores a city, state, & zip code.
Returns:
citystatezip: Dictionary that stores a city, state, & zip code."""
fields = [None] * 3
patternobj = re.compile('^([A-Z0-9\s+-]+)\s([A-Z]{2})\s([0-9-]+)$')
if citystatezip is not None:
matchobj = patternobj.match(citystatezip)
if matchobj is not None:
fields = [matchobj.group(1),
matchobj.group(2),
matchobj.group(3)]
return dict(zip(['city', 'state', 'zip'], fields))
def format_record(raw_record,
parsing_params):
""" Formats a TDLR record
Args:
raw_record: List that contains a raw TDLR record
parsing_params: Dictionary that contains parameters required to
parse a TDLR record
Key: Value:
--- ------
numskippedfields Number of fields to skip
fieldname List that contains formatted
TDLR record field names
Returns:
formatted_record: Dictionary that stores a formatted TDLR record"""
tdlr_record = dict(zip(parsing_params['fieldname'],
raw_record[parsing_params['numskippedfields']:]))
for key in parsing_params['subcolumntypes']:
tdlr_record[key] = dict(zip(parsing_params['subcolumntypes'][key],
tdlr_record[key]))
tdlr_record['continuingeducationflag'] =\
parse_boolean(tdlr_record['continuingeducationflag'])
licenseexpirationdate = tdlr_record['licenseexpirationdate']
if len(licenseexpirationdate) == 7:
licenseexpirationdate = '0' + licenseexpirationdate
licenseexpirationdate =\
datetime.strptime(licenseexpirationdate, "%m%d%Y")
tdlr_record['licenseexpirationdate'] =\
licenseexpirationdate.strftime('%Y-%m-%d')
mailingaddresscitystatezip = tdlr_record.pop('mailingaddresscitystatezip')
mailingaddresscitystatezip =\
parse_citystatezip(mailingaddresscitystatezip)
tdlr_record['mailingaddresscity'] = mailingaddresscitystatezip['city']
tdlr_record['mailingaddressstate'] = mailingaddresscitystatezip['state']
tdlr_record['mailingaddresszip'] = mailingaddresscitystatezip['zip']
businesscitystatezip =\
parse_citystatezip(tdlr_record.pop('businesscitystatezip'))
tdlr_record['businesscity'] = businesscitystatezip['city']
tdlr_record['businessstate'] = businesscitystatezip['state']
tdlr_record['businesszip'] = businesscitystatezip['zip']
mailingaddresslocation = tdlr_record.pop('mailingaddresslocation')
tdlr_record['mailingaddresslocation'] =\
{'lat': parse_float(mailingaddresslocation['latitude']),
'lon': parse_float(mailingaddresslocation['longitude'])}
for key in tdlr_record['mailingaddresslocation']:
if tdlr_record['mailingaddresslocation'][key] is None:
tdlr_record['mailingaddresslocation'][key] = -1.0
tdlr_record['mailingaddresscountycode'] =\
parse_int(tdlr_record['mailingaddresscountycode'])
return tdlr_record
In [3]:
data_path = "./Data"
data_file = 'tdlrAllLicenses.json'
datafile_fullpath = os.path.join(data_path, data_file)
if not os.path.exists(data_path):
os.mkdir(data_path)
download_url = "https://data.texas.gov/api/views/7358-krk7/rows.json?" +\
"accessType=DOWNLOAD"
requestobj = requests.get(download_url)
with open(datafile_fullpath, "w") as outfile:
json.dump(requestobj.json(), outfile)
with open(datafile_fullpath, "r") as infile:
tdlr_all_licenses = json.load(infile)
In [4]:
clusterobj = Elasticsearch('localhost:9200')
if not clusterobj.indices.exists('texasopendata'):
clusterobj.indices.create('texasopendata', body=init_index_mapping())
In [5]:
parsing_params = init_record_parsing(tdlr_all_licenses)
number_documents = len(tdlr_all_licenses['data'])
actions = []
for idx in np.arange(0, number_documents):
actions.append({'_index': 'texasopendata',
'_type': 'tdlr',
'_source': format_record(tdlr_all_licenses['data'][idx],
parsing_params)})
if len(actions) % 1000 == 0:
print("Completed formatting document #%d" % (idx))
helpers.bulk(clusterobj, actions)
actions = []
if len(actions) > 0:
helpers.bulk(clusterobj, actions)
In [ ]: