Parcel loading

Given a set of parcels (assumes GDB format) from the parcel provider, this notebook will load individual features (from the parcel provider -- currently each feature-type is a county) into respective tables in postgres. Unfortunately due to the nature of the OpenFileGDB driver provided by GDAL (also experienced w/ the ESRI FileGDB driver) loading into a single homogeneous table was not working and loading all feature types (eg. separate tables for each feature type) would also error out at about 30% of parcels loaded.

This notebook also assumes that the feature tables for every feature have already been created (eg. as the result of a failed bulk load). Empty tables will be backfilled with feature data (in chunks of 50 counties at a time) and also compare feature counts per table to the parcel provider's metadata.


In [ ]:
import psycopg2 as pg
import pandas as pd
import os

conn = pg.connect('service=parcels')
conn_str = os.environ.get('PARCELS_CONNECTION')

Load into individual parcel tables in chunks


In [ ]:
def chunks(l, n):
    """Yield successive n-sized chunks from l."""
    for i in range(0, len(l), n):
        yield l[i:i + n]

In [ ]:
with conn.cursor() as cur:
    cur.execute("select table_name from information_schema.tables where table_schema = 'core_logic_2018'")
    res = cur.fetchall()

In [ ]:
tables = [x[0] for x in res]

In [ ]:
res = []

with conn.cursor() as cur:
    for t in tables:
        cur.execute("select count(1) from core_logic_2018.{}".format(t))
        res.append({'table': t, 'count': cur.fetchone()[0]})

In [ ]:
to_do = filter(lambda x: x['count'] == 0, res)

In [ ]:
for table_list in chunks(map(lambda x: x['table'], to_do), 50):
    tables = ' '.join(table_list)
    print "Loading {}".format(tables)
    !GDAL_MAX_DATASET_POOL_SIZE=100 ogr2ogr -f "PostgreSQL" PG:"$conn_str" ut_parcel_premium.gdb/ $tables -progress -lco SCHEMA=core_logic_2018 -lco OVERWRITE=yes --config PG_USE_COPY YES

Validate loading


In [ ]:
df = pd.read_csv("./parcel-meta.csv")

for r in res:
    row = df[df.Filename == r['table']]
    if int(row['Records']) != r['count']:
        display('Mismatch on parcel: {} csv count {} != db count of {}'.format(row['Filename'], row['Records'], r['count']))

Consolidate into single table

Create destination parcels table


In [ ]:
with conn.cursor() as c:
    c.execute('CREATE TABLE public.parcels_2018 AS TABLE public.parcels WITH NO DATA;')
conn.commit()

In [ ]:
sql = """
insert into public.parcels_2018 (ogc_fid, wkb_geometry, parcel_id, state_code, cnty_code, apn, 
            apn2, addr, city, state, zip, plus, std_addr, std_city, std_state, 
            std_zip, std_plus, fips_code, unfrm_apn, apn_seq_no, frm_apn, 
            orig_apn, acct_no, census_tr, block_nbr, lot_nbr, land_use, m_home_ind, 
            prop_ind, own_cp_ind, tot_val, lan_val, imp_val, tot_val_cd, 
            lan_val_cd, assd_val, assd_lan, assd_imp, mkt_val, mkt_lan, mkt_imp, 
            appr_val, appr_lan, appr_imp, tax_amt, tax_yr, assd_yr, ubld_sq_ft, 
            bld_sq_ft, liv_sq_ft, gr_sq_ft, yr_blt, eff_yr_blt, bedrooms, 
            rooms, bld_code, bld_imp_cd, condition, constr_typ, ext_walls, 
            quality, story_nbr, bld_units, units_nbr)
select {}, shape, parcel_id, state_code, cnty_code, apn, 
            apn2, addr, city, state, zip, plus, std_addr, std_city, std_state, 
            std_zip, std_plus, fips_code, unfrm_apn, apn_seq_no, frm_apn, 
            orig_apn, acct_no, census_tr, block_nbr, lot_nbr, land_use, m_home_ind, 
            prop_ind, own_cp_ind, tot_val, lan_val, imp_val, tot_val_cd, 
            lan_val_cd, assd_val, assd_lan, assd_imp, mkt_val, mkt_lan, mkt_imp, 
            appr_val, appr_lan, appr_imp, tax_amt, tax_yr, assd_yr, ubld_sq_ft, 
            bld_sq_ft, liv_sq_ft, gr_sq_ft, yr_blt, eff_yr_blt, bedrooms, 
            rooms, bld_code, bld_imp_cd, condition, constr_typ, ext_walls, 
            quality, story_nbr, bld_units, units_nbr
from core_logic_2018.{}
"""


try:
    for r in res[50:]:
        with conn.cursor() as c:
            c.execute("select count(1) from information_schema.columns where table_name = %(table)s and table_schema = 'core_logic_2018' and column_name = 'objectid'",
                      r)
            id_col = 'objectid' if c.fetchone()[0] == 1 else 'ogc_fid'
            c.execute(sql.format(id_col, r['table']))
    conn.commit()
except Exception as e:
    print(e)
    conn.rollback()

In [ ]:
with conn.cursor() as c:
    c.execute("""CREATE INDEX ON public.parcels_2018 USING btree (census_tr COLLATE pg_catalog."default");""")
    c.execute("""CREATE INDEX ON public.parcels_2018 USING btree ("substring"(census_tr::text, 0, 7) COLLATE pg_catalog."default");""")
    c.execute("""CREATE INDEX ON public.parcels_2018 USING btree (city COLLATE pg_catalog."default", state COLLATE pg_catalog."default");""")
    c.execute("""CREATE INDEX ON public.parcels_2018 USING btree (fips_code COLLATE pg_catalog."default");""")
    c.execute("""CREATE INDEX ON public.parcels_2018 USING btree (land_use COLLATE pg_catalog."default");""")
    c.execute("""CREATE UNIQUE INDEX ON public.parcels_2018 USING btree (parcel_id);""")
    c.execute("""CREATE INDEX ON public.parcels_2018 USING gist (wkb_geometry);""")
    c.execute("""CREATE INDEX ON public.parcels_2018 USING btree (state_code COLLATE pg_catalog."default", "substring"(census_tr::text, 0, 7) COLLATE pg_catalog."default");""")
    c.execute("""CREATE INDEX ON public.parcels_2018 USING btree (state COLLATE pg_catalog."default");""")
    c.execute("""CREATE INDEX ON public.parcels_2018 USING btree (story_nbr);""")