In [1]:
import pandas as pd
import numpy as np

import matplotlib as plt

from shapely.geometry import Point, Polygon

from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import KFold

import zipfile
import requests
import os
import shutil



from downloading_funcs import addr_shape, down_extract_zip
from supp_funcs import *
import lnks

import warnings #DANGER: I triggered a ton of warnings.
warnings.filterwarnings('ignore')

In [2]:
import geopandas as gpd

%matplotlib inline

In [3]:
#Load the BBL list
BBL12_17CSV = ['https://hub.arcgis.com/datasets/82ab09c9541b4eb8ba4b537e131998ce_22.csv', 'https://hub.arcgis.com/datasets/4c4d6b4defdf4561b737a594b6f2b0dd_23.csv',   'https://hub.arcgis.com/datasets/d7aa6d3a3fdc42c4b354b9e90da443b7_1.csv',     'https://hub.arcgis.com/datasets/a8434614d90e416b80fbdfe2cb2901d8_2.csv', 'https://hub.arcgis.com/datasets/714d5f8b06914b8596b34b181439e702_36.csv',     'https://hub.arcgis.com/datasets/c4368a66ce65455595a211d530facc54_3.csv',]

In [4]:
def data_pipeline(shapetype, bbl_links, supplement=None,
                 dex=None, ts_lst_range=None):
    #A pipeline for group_e dataframe operations
    
    
    #Test inputs --------------------------------------------------------------
    if supplement:
        assert isinstance(supplement, list)
    assert isinstance(bbl_links, list)
    if ts_lst_range:
        assert isinstance(ts_lst_range, list)
        assert len(ts_lst_range) == 2 #Must be list of format [start-yr, end-yr]
    
    #We'll need our addresspoints and our shapefile
    if not dex:
        dex = addr_shape(shapetype)
    
    #We need a list of time_unit_of_analysis
    if ts_lst_range:
        ts_lst = [x+(i/100) for i in range(1,13,1) for x in range(1980, 2025)]
        ts_lst = [x for x in ts_lst if 
                  x >= ts_lst_range[0] and x <= ts_lst_range[1]]
        ts_lst = sorted(ts_lst)
    if not ts_lst_range:
        ts_lst = [x+(i/100) for i in range(1,13,1) for x in range(2012, 2017)]
        ts_lst = sorted(ts_lst)
    
    #Now we need to stack our BBL data ----------------------------------------
    
    #Begin by forming an empty DF 
    bbl_df = pd.DataFrame()
    for i in list(range(2012, 2018)):
        bblpth = './data/bbls/Basic_Business_License_in_'+str(i)+'.csv' #Messy hack
        #TODO: generalize bblpth above
        bbl = pd.read_csv(bblpth, low_memory=False)
        col_len = len(bbl.columns)
        bbl_df = bbl_df.append(bbl)
        if len(bbl.columns) != col_len:
            print('Column Mismatch!')
        del bbl
        
    bbl_df.LICENSE_START_DATE      = pd.to_datetime(
        bbl_df.LICENSE_START_DATE)
    
    bbl_df.LICENSE_EXPIRATION_DATE = pd.to_datetime(
        bbl_df.LICENSE_EXPIRATION_DATE)
    
    bbl_df.LICENSE_ISSUE_DATE      = pd.to_datetime(
        bbl_df.LICENSE_ISSUE_DATE)

    
    bbl_df.sort_values('LICENSE_START_DATE')
        
    #Set up our time unit of analysis
    bbl_df['month']      = 0
    bbl_df['endMonth']   = 0
    bbl_df['issueMonth'] = 0
    
    bbl_df['month'] = bbl_df['LICENSE_START_DATE'].dt.year + (
        bbl_df['LICENSE_START_DATE'].dt.month/100
    )
    bbl_df['endMonth'] = bbl_df['LICENSE_EXPIRATION_DATE'].dt.year + (
        bbl_df['LICENSE_EXPIRATION_DATE'].dt.month/100
    )
    bbl_df['issueMonth'] = bbl_df['LICENSE_ISSUE_DATE'].dt.year + (
        bbl_df['LICENSE_ISSUE_DATE'].dt.month/100
    )
    bbl_df.endMonth.fillna(max(ts_lst))
    bbl_df['endMonth'][bbl_df['endMonth'] > max(ts_lst)] = max(ts_lst)
       
    #Sort on month
    bbl_df = bbl_df.dropna(subset=['month'])
    bbl_df = bbl_df.set_index(['MARADDRESSREPOSITORYID','month'])
    bbl_df = bbl_df.sort_index(ascending=True)
    bbl_df.reset_index(inplace=True)
    
        
    bbl_df = bbl_df[bbl_df['MARADDRESSREPOSITORYID'] >= 0]
        
    bbl_df = bbl_df.dropna(subset=['LICENSESTATUS', 'issueMonth', 'endMonth',
                                   'MARADDRESSREPOSITORYID','month', 
                                   'LONGITUDE', 'LATITUDE'
                                  ])
    
    #Now that we have the BBL data, let's create our flag and points data -----
    
    #This is the addresspoints, passed from the dex param
    addr_df = dex[0]
    
    #Zip the latlongs
    addr_df['geometry'] = [
        Point(xy) for xy in zip(
            addr_df.LONGITUDE.apply(float), addr_df.LATITUDE.apply(float)
        )
    ]
    
    addr_df['Points']   = addr_df['geometry'] #Duplicate, so raw retains points
    
    addr_df['dummy_counter'] = 1 #Always one, always dropped before export
    
    crs='EPSG:4326' #Convenience assignment of crs
    
    #Now we're stacking for each month ----------------------------------------
    
    
    out_gdf = pd.DataFrame() #Empty storage df
    for i in ts_lst: #iterate through the list of months
        print('Month '+ str(i))
        strmfile_pth = str(
                './data/strm_file/' + str(i) +'_' + shapetype + '.csv')
        if os.path.exists(strmfile_pth):
            print('Skipping, ' + str(i) + ' stream file path already exists:')
            print(strmfile_pth)
            continue

        #dex[1] is the designated shapefile passed from the dex param, 
        #and should match the shapetype defined in that param
        
        #Copy of the dex[1] shapefile
        shp_gdf = dex[1]
        
        #Active BBL in month i
        bbl_df['inRange'] = 0
        bbl_df['inRange'][(bbl_df.endMonth > i) & (bbl_df.month <= i)] = 1
        
        #Issued BBL in month i
        bbl_df['isuFlag'] = 0
        bbl_df['isuFlag'][bbl_df.issueMonth == i] = 1
        
        #Merge BBL and MAR datasets -------------------------------------------
        addr    = pd.merge(addr_df, bbl_df, how='left', 
                        left_on='ADDRESS_ID', right_on='MARADDRESSREPOSITORYID')
        addr    = gpd.GeoDataFrame(addr, crs=crs, geometry=addr.geometry)
        
        shp_gdf.crs = addr.crs
        
        raw     = gpd.sjoin(shp_gdf, addr, how='left', op='intersects')
        
        #A simple percent of buildings with active flags per shape,
        #and call it a 'utilization index'
        numer = raw.groupby('NAME').sum()
        numer = numer.inRange
        denom = raw.groupby('NAME').sum()
        denom = denom.dummy_counter
        issue = raw.groupby('NAME').sum()
        issue = issue.isuFlag
        
        flags = []
        
        utl_inx           = pd.DataFrame(numer/denom)
        
        utl_inx.columns   = [
            'Util_Indx_BBL'
        ]
        flags.append(utl_inx)
        
        #This is number of buildings with an active BBL in month i
        bbl_count         = pd.DataFrame(numer)
        
        bbl_count.columns = [
            'countBBL'
        ]
        flags.append(bbl_count)
        
        #This is number of buildings that were issued a BBL in month i
        isu_count         = pd.DataFrame(issue)
        isu_count.columns = [
            'countIssued'
        ]
        flags.append(isu_count)
        
        for flag in flags:
            flag.crs = shp_gdf.crs

            shp_gdf = shp_gdf.merge(flag,
                                    how="left", left_on='NAME', right_index=True)
        shp_gdf['month'] = i
        
        
        
        #Head will be the list of retained columns
        head = ['NAME', 'Util_Indx_BBL',
               'countBBL', 'countIssued',
               'month', 'geometry']
        shp_gdf = shp_gdf[head]
        
        print('Merging...')
        if supplement: #this is where your code will be fed into the pipeline.
            
            #To include time unit of analysis, pass 'i=i' as the last
            #item in your args list over on lnks.py, and the for-loop
            #will catch that. Else, it will pass your last item as an arg.
            
            #Ping CDL if you need to pass a func with more args and we
            #can extend this.
            
            for supp_func in supplement:
                if len(supp_func) == 2:
                    if supp_func[1] == 'i=i':
                        shp_gdf = supp_func[0](shp_gdf, raw, i=i)
                    if supp_func[1] != 'i=i':
                        shp_gdf = supp_func[0](shp_gdf, raw, supp_func[1])
                
                if len(supp_func) == 3:
                    if supp_func[2] == 'i=i':
                        shp_gdf = supp_func[0](shp_gdf, raw, supp_func[1], i=i)
                    if supp_func[2] != 'i=i':
                        shp_gdf = supp_func[0](shp_gdf, raw, supp_func[1],
                                              supp_func[2])
                if len(supp_func) == 4:
                    if supp_func[3] == 'i=i':
                        shp_gdf = supp_func[0](shp_gdf, raw, supp_func[1],
                                              supp_func[2], i=i)
                    if supp_func[3] != 'i=i':
                        shp_gdf = supp_func[0](shp_gdf, raw, supp_func[1],
                                              supp_func[2], supp_func[3])
                print(str(supp_func[0]) + ' is done.')
        
        
        if not os.path.exists(strmfile_pth):
            shp_gdf = shp_gdf.drop('geometry', axis=1)
            
            #Save, also verify re-read works
            shp_gdf.to_csv(strmfile_pth, encoding='utf-8', index=False)
            shp_gdf = pd.read_csv(strmfile_pth, encoding='utf-8', 
                                  engine='python')
        del shp_gdf, addr, utl_inx, numer, denom, issue, raw #Save me some memory please!
        #if i != 2016.12:
        #    del raw
        print('Merged month:', i)
        print()
        
    #Done iterating through months here....
    pth = './data/strm_file/' #path of the streamfiles
    for file in os.listdir(pth):
        try:
            filepth = str(os.path.join(pth, file))
            print([os.path.getsize(filepth), filepth])
            fl = pd.read_csv(filepth, encoding='utf-8', engine='python') #read the stream file
            out_gdf = out_gdf.append(fl) #This does the stacking
            del fl
        except IsADirectoryError:
            continue
    out_gdf.to_csv('./data/' + shapetype + '_out.csv') #Save
    #shutil.rmtree('./data/strm_file/')
    
    print('Done!')
    return [bbl_df, addr_df, out_gdf] #Remove this later, for testing now

In [ ]:
dex = addr_shape('anc')

In [ ]:
sets = data_pipeline('anc', BBL12_17CSV, supplement=lnks.supplm, dex=dex, ts_lst_range=None)


Month 2012.01
Skipping, 2012.01 stream file path already exists:
./data/strm_file/2012.01_anc.csv
Month 2012.02
Skipping, 2012.02 stream file path already exists:
./data/strm_file/2012.02_anc.csv
Month 2012.03
Skipping, 2012.03 stream file path already exists:
./data/strm_file/2012.03_anc.csv
Month 2012.04
Skipping, 2012.04 stream file path already exists:
./data/strm_file/2012.04_anc.csv
Month 2012.05
Skipping, 2012.05 stream file path already exists:
./data/strm_file/2012.05_anc.csv
Month 2012.06
Skipping, 2012.06 stream file path already exists:
./data/strm_file/2012.06_anc.csv
Month 2012.07
Skipping, 2012.07 stream file path already exists:
./data/strm_file/2012.07_anc.csv
Month 2012.08
Skipping, 2012.08 stream file path already exists:
./data/strm_file/2012.08_anc.csv
Month 2012.09
Skipping, 2012.09 stream file path already exists:
./data/strm_file/2012.09_anc.csv
Month 2012.1
Skipping, 2012.1 stream file path already exists:
./data/strm_file/2012.1_anc.csv
Month 2012.11
Skipping, 2012.11 stream file path already exists:
./data/strm_file/2012.11_anc.csv
Month 2012.12
Skipping, 2012.12 stream file path already exists:
./data/strm_file/2012.12_anc.csv
Month 2013.01
Skipping, 2013.01 stream file path already exists:
./data/strm_file/2013.01_anc.csv
Month 2013.02
Skipping, 2013.02 stream file path already exists:
./data/strm_file/2013.02_anc.csv
Month 2013.03
Skipping, 2013.03 stream file path already exists:
./data/strm_file/2013.03_anc.csv
Month 2013.04
Skipping, 2013.04 stream file path already exists:
./data/strm_file/2013.04_anc.csv
Month 2013.05
Skipping, 2013.05 stream file path already exists:
./data/strm_file/2013.05_anc.csv
Month 2013.06
Skipping, 2013.06 stream file path already exists:
./data/strm_file/2013.06_anc.csv
Month 2013.07
Skipping, 2013.07 stream file path already exists:
./data/strm_file/2013.07_anc.csv
Month 2013.08
Skipping, 2013.08 stream file path already exists:
./data/strm_file/2013.08_anc.csv
Month 2013.09
Skipping, 2013.09 stream file path already exists:
./data/strm_file/2013.09_anc.csv
Month 2013.1
Skipping, 2013.1 stream file path already exists:
./data/strm_file/2013.1_anc.csv
Month 2013.11
Skipping, 2013.11 stream file path already exists:
./data/strm_file/2013.11_anc.csv
Month 2013.12
Skipping, 2013.12 stream file path already exists:
./data/strm_file/2013.12_anc.csv
Month 2014.01
Skipping, 2014.01 stream file path already exists:
./data/strm_file/2014.01_anc.csv
Month 2014.02
Skipping, 2014.02 stream file path already exists:
./data/strm_file/2014.02_anc.csv
Month 2014.03
Skipping, 2014.03 stream file path already exists:
./data/strm_file/2014.03_anc.csv
Month 2014.04
Skipping, 2014.04 stream file path already exists:
./data/strm_file/2014.04_anc.csv
Month 2014.05
Skipping, 2014.05 stream file path already exists:
./data/strm_file/2014.05_anc.csv
Month 2014.06
Skipping, 2014.06 stream file path already exists:
./data/strm_file/2014.06_anc.csv
Month 2014.07
Merging...
<function ITSPExtract at 0x7fee9bea9ea0> is done.
<function clim_ingest at 0x7fee9bea9e18> is done.
<function oecdGdpQs at 0x7fee9bea9d08> is done.

In [ ]:
sets[2].columns #Our number of rows equals our number of shapes * number of months

In [ ]: