In [1]:
    
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime
import json
import os
import numpy as np
import re
import requests
from distutils.util import strtobool
from dateutil import parser
    
In [2]:
    
def init_record_parsing(tdlr_all_licenses):
    """ Initializes a dictionary that contains the following parameters:
    - Number of TDLR record fields to skip
    - TDLR record field names
    
    Args:
        tdlr_all_licenses: Nested dictionary that stores TDLR records
    Return: parsing_params: Dictionary that contains parameters required to
                            parse a TDLR record
                            
                            Key:               Value:
                            ---                ------
                            numskippedfields   Number of fields to skip
                            
                            fieldname          List that contains formatted
                                               TDLR record field names """
    fieldname = []
    for elem in tdlr_all_licenses['meta']['view']['columns']:
        fieldname.append(elem['fieldName'])
    num_skipped_fields = np.argwhere([re.match('^:.*', elem) != None
                                      for elem in fieldname])[-1][0] + 1
    fieldname = [re.sub('[_\s]', '', elem) for elem in fieldname]
    fieldname = [re.sub('mmddccyy', '', elem) for elem in fieldname]
    sub_column_types = {}
    for idx in np.arange(num_skipped_fields,
                         len(tdlr_all_licenses['meta']['view']['columns'])):
        if 'subColumnTypes' in tdlr_all_licenses['meta']['view']['columns'][idx].keys():
            sub_column_types[fieldname[idx]] =\
                tdlr_all_licenses['meta']['view']['columns'][idx]['subColumnTypes']
            sub_column_types[fieldname[idx]] = [re.sub('[\s_]', '',  elem)
                                                for elem in sub_column_types[fieldname[idx]]]    
    
    fieldname = fieldname[num_skipped_fields:]
    
    return {'numskippedfields': num_skipped_fields,
            'fieldname': fieldname,
            'subcolumntypes': sub_column_types}
def init_index_mapping():
    """ Initializes a nested dictionary that defines the mapping of an 
    Elasticsearch index that contains TDLR records.
    
    Args:
        None
    
    Returns:
        mapping: Nested dictionary that defines the mapping of an
                 Elasticsearch index that contains TDLR records."""
    mapping =\
    {
        "mappings": {
            "tdlr": {
                "properties": {
                    "licensetype": {
                        "type": "string",
                        "index": "not_analyzed"
                    },
                    "licensenumber": {
                        "type": "string",
                        "index": "not_analyzed"
                    },             
                    "businesscounty": {
                        "type": "string",
                        "index": "not_analyzed"
                    },
                    "businessname": {"type": "string"},
                    "businessaddressline1": {"type": "string"},
                    "businessaddressline2": {"type": "string"},
                    "businesscity": {
                        "type": "string",
                        "index": "not_analyzed"
                    },
                    "businessstate": {
                        "type": "string",
                        "index": "not_analyzed"
                    },
                    "businesszip": {
                        "type": "string",
                        "index": "not_analyzed"
                    },
                    "businesstelephone": {
                        "type": "string",
                        "index": "not_analyzed"
                    },
                    "licenseexpirationdate": {
                        "type": "date",
                        "format": "date_optional_time"
                    },
                    "ownername": {"type": "string"},
                    "mailingaddressline1": {"type": "string"},
                    "mailingaddressline2": {"type": "string"},
                    "mailingaddresscity": {
                        "type": "string",
                        "index": "not_analyzed"
                    },
                    "mailingaddressstate": {
                        "type": "string",
                        "index": "not_analyzed"
                    },
                    "mailingaddresszip": {
                        "type": "string",
                        "index": "not_analyzed"
                    },
                    "mailingaddresscountycode": {"type": "integer"},
                    "mailingaddresscounty": {
                        "type": "string",
                        "index": "not_analyzed"
                    },
                    "ownertelephone": {
                        "type": "string",
                        "index": "not_analyzed"
                    },
                    "licensesubtype":{
                        "type": "string",
                        "index": "not_analyzed"
                    },
                    "continuingeducationflag": {"type": "boolean"},
                    "mailingaddresslocation": {"type": "geo_point"}
                }
            }        
        }
    }
    
    return mapping
def parse_int(string_value):
    """ Parses an integer number
    
    Args:
        string_value: String that contains an integer
    
    Returns:
        int_value: Integer number"""
    try:
        int_value = np.int(string_value)
    except TypeError:
        int_value = None
    return int_value
def parse_float(string_value):
    """ Parses a floating point number
    
    Args:
        string_value: String that contains a floating point number
    
    Returns:
        float_value: Floating point number"""
    try:
        float_value = np.float(string_value)
    except TypeError:
        float_value = None
    
    return float_value
def parse_boolean(string_value):
    """ Parses a boolean value
    
    Args:
        string_value: String that contains an boolean value
    
    Returns:
        boolean_value: Boolean value"""
    try:
        boolean_value = np.bool(strtobool(string_value))
    except TypeError:
        boolean_value = False
    except AttributeError:
        boolean_value = False
    return boolean_value
def parse_citystatezip(citystatezip):
    """ Parses a string that stores a city, state, & zip code.
    
    Args:
        citystatezip: String that stores a city, state, & zip code.
    
    Returns:
        citystatezip: Dictionary that stores a city, state, & zip code."""
    fields = [None] * 3
    
    patternobj = re.compile('^([A-Z0-9\s+-]+)\s([A-Z]{2})\s([0-9-]+)$')
    if citystatezip is not None:
        matchobj = patternobj.match(citystatezip)
        if matchobj is not None:
            fields = [matchobj.group(1),
                      matchobj.group(2),
                      matchobj.group(3)]
    
    return dict(zip(['city', 'state', 'zip'], fields))
def format_record(raw_record,
                  parsing_params):
    """ Formats a TDLR record
    
    Args:
        raw_record: List that contains a raw TDLR record
        
        parsing_params: Dictionary that contains parameters required to
                        parse a TDLR record
                            
                        Key:               Value:
                        ---                ------
                        numskippedfields   Number of fields to skip
                            
                        fieldname          List that contains formatted
                                           TDLR record field names
                                           
    Returns:
        formatted_record: Dictionary that stores a formatted TDLR record"""
    tdlr_record = dict(zip(parsing_params['fieldname'],
                           raw_record[parsing_params['numskippedfields']:]))
    for key in parsing_params['subcolumntypes']:
        tdlr_record[key] = dict(zip(parsing_params['subcolumntypes'][key],
                                    tdlr_record[key]))
    tdlr_record['continuingeducationflag'] =\
        parse_boolean(tdlr_record['continuingeducationflag'])
    licenseexpirationdate = tdlr_record['licenseexpirationdate']
    if len(licenseexpirationdate) == 7:
        licenseexpirationdate = '0' + licenseexpirationdate
    licenseexpirationdate =\
        datetime.strptime(licenseexpirationdate, "%m%d%Y")
    
    tdlr_record['licenseexpirationdate'] =\
        licenseexpirationdate.strftime('%Y-%m-%d')
    mailingaddresscitystatezip = tdlr_record.pop('mailingaddresscitystatezip')
    mailingaddresscitystatezip =\
        parse_citystatezip(mailingaddresscitystatezip)
    tdlr_record['mailingaddresscity'] = mailingaddresscitystatezip['city']
    tdlr_record['mailingaddressstate'] = mailingaddresscitystatezip['state']
    tdlr_record['mailingaddresszip'] = mailingaddresscitystatezip['zip']
    businesscitystatezip =\
        parse_citystatezip(tdlr_record.pop('businesscitystatezip'))
    tdlr_record['businesscity'] = businesscitystatezip['city']
    tdlr_record['businessstate'] = businesscitystatezip['state']
    tdlr_record['businesszip'] = businesscitystatezip['zip']
    mailingaddresslocation = tdlr_record.pop('mailingaddresslocation')
    tdlr_record['mailingaddresslocation'] =\
        {'lat': parse_float(mailingaddresslocation['latitude']),
         'lon': parse_float(mailingaddresslocation['longitude'])}
        
    for key in tdlr_record['mailingaddresslocation']:
        if tdlr_record['mailingaddresslocation'][key] is None:
            tdlr_record['mailingaddresslocation'][key] = -1.0
    tdlr_record['mailingaddresscountycode'] =\
        parse_int(tdlr_record['mailingaddresscountycode'])
        
    return tdlr_record
    
In [3]:
    
data_path = "./Data"
data_file = 'tdlrAllLicenses.json'
datafile_fullpath = os.path.join(data_path, data_file)
if not os.path.exists(data_path):
    os.mkdir(data_path)
    
    download_url = "https://data.texas.gov/api/views/7358-krk7/rows.json?" +\
                   "accessType=DOWNLOAD"
        
    requestobj = requests.get(download_url)
    
    with open(datafile_fullpath, "w") as outfile:
        json.dump(requestobj.json(), outfile)
with open(datafile_fullpath, "r") as infile:
    tdlr_all_licenses = json.load(infile)
    
In [4]:
    
clusterobj = Elasticsearch('localhost:9200')
if not clusterobj.indices.exists('texasopendata'):
    clusterobj.indices.create('texasopendata', body=init_index_mapping())
    
In [5]:
    
parsing_params = init_record_parsing(tdlr_all_licenses)
number_documents = len(tdlr_all_licenses['data'])
actions = []
for idx in np.arange(0, number_documents):
    actions.append({'_index': 'texasopendata',
                    '_type': 'tdlr',
                    '_source': format_record(tdlr_all_licenses['data'][idx],
                                              parsing_params)})
    
    if len(actions) % 1000 == 0:
        print("Completed formatting document #%d" % (idx))
        helpers.bulk(clusterobj, actions)
        actions = []
if len(actions) > 0:
    helpers.bulk(clusterobj, actions)
    
    
In [ ]: