Merge craiglist data with synthetic census data

Process in batches, one state at a time. Or get data by region.

Steps:

  1. Get craigslist data
    • Query in batches based on FIPS
    • Joing to census via FIPS code
  2. Get census data
    • Aggregate to BG
  3. Merge together

TODO:

  • The census data for DC are missing. The files are there but they contain no data.
  • Put connection passwords in a separate txt file.
  • If need to look at education or other person data, aggregate person data to household and then block group

In [ ]:
import pandas as pd
import psycopg2
import paramiko
import os
import numpy as np

Craigslist data table columns

pid | character varying(25) | date | date | region | character varying(50) | neighborhood | character varying(200) | rent | double precision | bedrooms | double precision | sqft | double precision | rent_sqft | double precision | longitude | double precision | latitude | double precision | county | character varying(20) | fips_block | character varying(20) | state | character varying(20) | bathrooms

FIPS code format

53-----033---001701--1--015

[state][county][tract][bg][block]

Note: for DC, county='001'

Synthetic census data - variables

Household data: household_id,serialno,persons,cars,income,race_of_head,age_of_head,workers,state,county,tract,block group,children,tenure,recent_mover

Person data: person_id,member_id,age,relate,edu,sex,hours,earning,race_id,household_id,student,work_at_home,worker

Remote connection parameters


In [ ]:
data_dir='../data/' 
"""Path to local data directory"""

username='cy290e'
hostname='169.229.154.119'
db_name='craigslist'
password='' #password to database.  IMPORTANT: do not save passwords in the notebook
"""Postgres connection parameters"""

# establish postgres connection
conn = psycopg2.connect("dbname={d} user={u} host={h} password={pw}".format(d=db_name, u=username, h=hostname, pw=password))
cur = conn.cursor()

In [ ]:
hostname = '169.229.154.119'
username = 'cy290e'
password  = ''  # password for key. IMPORTANT: do not save passwords in the notebook
local_key_dir = '~/.ssh/known_hosts' # local dir with known hosts
"""SSH connection parameters"""

census_dir = 'synthetic_population/'
"""Remote directory with census data"""

results_dir = 'craigslist_census/'
"""Remote directory for results"""

# estbalish SSH connection
ssh = paramiko.SSHClient() 
ssh.load_host_keys(local_key_dir)
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname,username=username, password=password)
sftp = ssh.open_sftp()

Create FIPS look-up tables


In [ ]:
# make dictionary of states and fips codes. 
fips_state = pd.read_csv(data_dir+'state_fips_codes.csv',dtype=str)
fips2state=dict(zip(fips_state['FIPS'],fips_state['USPS']))
state2fips=dict(zip(fips_state['USPS'],fips_state['FIPS']))

# Make lookup for county to MPO code 
mpo_counties = pd.read_csv(data_dir+'us_2015_mpo_regions_counties_v1.csv', encoding='latin1', dtype={'MPO_ID':str,'COUNTYFP':str,'STFIPS':str})
mpo_counties['COUNTYFP'] = mpo_counties['COUNTYFP'].str.zfill(2)  
mpo_counties['st_co_fips'] = mpo_counties['STFIPS']+mpo_counties['COUNTYFP']  # we will want to join on 2-char state + 3-char county fips
county2mpo=dict(zip(mpo_counties['st_co_fips'],mpo_counties['MPO_ID']))  # do we want MPO_ID or do we want GEOID? 
mpo_counties.head()

In [ ]:
def run_query(q):
    """ Get results given SQL query"""
    cur.execute(q)
    return(cur.fetchall())

def get_craiglist(filters):
    """Get craiglist data from database.
    Args: 
        filters (list): list of strings containing filter criteria. E.g., ["region='sandiego'","rent>100"]
    Returns: 
        DataFrame: listings data. 
    """
    #q="SELECT pid,date,rent,bedrooms,bathrooms,sqft,rent_sqft,fips_block,state,region,longitude,latitude FROM rental_listings WHERE state='{}';".format(state)
    filters_str = ' AND '.join([x for x in filters])
    q="SELECT pid,date,rent,bedrooms,bathrooms,sqft,rent_sqft,fips_block,state,region,longitude,latitude FROM rental_listings WHERE {};".format(filters_str)
    results=run_query(q)
    df=pd.DataFrame(results,columns=['listing_id', 'date','rent','bedrooms','bathrooms','sqft','rent_sqft','fips_block','state','region','lng','lat'] )  # put it all into a dataframe
    
    # split FIPS into different columns - split off the last 3 chars
    df['block']=df.fips_block.str[-4:]
    df['fips12']=df.fips_block.str[:-3]
    return(df)

def read_census_file(fname):
    """Read census csv file via SFTP and return as dataframe."""
    with sftp.open(census_dir+fname) as f:
        df = pd.read_csv(f, delimiter=',',dtype={'age_of_head':float, 'block group':str, 'cars':float, 'children':float, 'county':str,
           'household_id':str, 'income':float, 'persons':float, 'race_of_head':str, 'recent_mover':str,
           'serialno':str, 'state':str, 'tenure':str, 'tract':str, 'workers':float})
    return df

def write_results_file(data,fname):
    """Write merged data to csv file via SFTP."""
    with sftp.open(results_dir+fname,'w') as f:
        data.to_csv(f,index=True)
    return

def get_census_by_state(state, table='households'): 
    """Return all census data for state given two-char abbreviation. Can be 'households' or 'persons' data. """ 
    filelist=sftp.listdir(census_dir)
    if table=='households':
        files = [f for f in filelist if f[:5]=='hh_{}'.format(state)]
    elif table=='persons':
        files = [f for f in filelist if f[:4]=='p_{}'.format(state)]
    #files = files[:3]  # uncomment this line for testing.
    new_df = pd.DataFrame()
    for f in files:
        df = read_census_file(f)
        new_df = pd.concat([new_df,df])
    return(new_df)

def strip_zeros(s):
    """Remove '.0 from end of string"""
    if s.endswith('.0'):
        return(s[:-2])
    else:
        return(s)

def format_hh_data(df):
    """Fix formatting for hhs census data. Replace '' strings with zero. Format other strings."""

    df['county'] = df['county'].str.zfill(2)  # make county 3-char string.
    
    for col in ['children','workers']:
        df[col] = df[col].replace('','0')

    for col in ['race_of_head','recent_mover','tenure']:
        df[col] = df[col].astype(str)
        df[col] = df[col].map(strip_zeros)  # make sure strings are formatted. 
    return(df)

def aggregate_census(df, groupby_cols=['county','tract','block group'],cols_to_sum=['cars','children','persons','workers'], cols_to_median=['age_of_head','income'],categ_cols=['race_of_head','recent_mover','tenure'],id_col='serialno',table='hhs'):
    """Aggregate census table to block group. Made this for hh data, may need to revised for persons data.
    Args: 
        groupby_cols (list): names of columns to group by (default=['county','tract','block group'])
        cols_to_sum (list): names of columns for which to compute totals. 
        cols_to_median (list): names of columns for which to compute medians
        categ_cols (list): names of categorical columns
        id_col (str): name of column that serves as the id column, to use in counting rows. 
        table (str): 'hhs' (default) or 'per'
    Returns: 
        DataFrame: aggregated data. 
        """
    # For some columns we'll want to find the sum or average/median. These will need only a simple groupby 
    sums = df.groupby(by=groupby_cols).sum()[cols_to_sum]
    sums.columns = [x+'_tot' for x in cols_to_sum]
    
    medians = df.groupby(by=groupby_cols).median()[cols_to_median]
    medians.columns = [x+'_med' for x in cols_to_median]
    
    counts = pd.DataFrame(df.groupby(by=groupby_cols).count()[id_col])
    counts.columns=[table+'_tot']

    # Categorical columns will need pivot tables. 
    categoricals = pd.DataFrame(index=counts.index)
    for col in categ_cols:
        pivoted=df.pivot_table(index = groupby_cols, columns = col, aggfunc='count')[id_col]
        pivoted.columns = [col+'_'+x for x in pivoted.columns]
        pivoted.columns = pivoted.columns.map(strip_zeros)
        # merge back together
        categoricals = pd.merge(categoricals, pivoted, left_index=True, right_index=True)

    # put all back together in one table
    merged = pd.merge(sums, medians, left_index=True, right_index=True)
    merged = pd.merge(merged, counts, left_index=True, right_index=True)
    merged = pd.merge(merged, categoricals, left_index=True, right_index=True)

    # check lengths of dataframes to detect any problems in grouping or merging
    lengths = [len(sums),len(medians),len(counts),len(categoricals),len(merged)]
    if len(set(lengths))>1:
        print('Warning: Aggregated tables have different lengths.',lengths,'for sums, medians, counts, categoricals, and merged.')
    
    return(merged)

def match_mpo(s, mpo_dict=county2mpo):
    """Match a 5-char state-county FIPS code to an MPO code
    Args: 
        s (str): 5-char state-county string
        mpo_dict (dict): county2mpo dictionary
    Returns: 
        str: MPO code
    """
    try: 
        return mpo_dict[s]
    except KeyError: # in this case, the county is not in an MPO
        return ''

In [ ]:
def run_all(state, filters=None):
    """Get craigslist data and merge with census data, by state, and save.  with additional filters if needed. 
    Args: 
        state (str): 2-char state abbreviation
        filters (list): additional filters. Do not need to include state in filter list"""
    # load and prepare craiglist data
    if filters:
        filters.append("state='{}'".format(state))
        print(filters)
        df_cl=get_craiglist(filters)
    else:
        df_cl=get_craiglist(["state='{}'".format(state)])
    df_cl['st_co_fps'] = df_cl.fips_block.map(lambda x: x[:5])
    df_cl['mpo_id'] = df_cl.st_co_fps.map(match_mpo)

    # load and prepare census data for households
    hhs = get_census_by_state(state, table='households')
    hhs = format_hh_data(hhs)
    hhs_bg = aggregate_census(hhs)
    hhs_bg=hhs_bg.reset_index()
    hhs_bg['fips12']=state2fips[state]+hhs_bg['county']+hhs_bg['tract']+hhs_bg['block group'] # create 12-digit FIPS code for merging. 

    # merge with craigslist data. 
    merged = pd.merge(df_cl, hhs_bg, on='fips12',how='left')
    merged = merged.set_index('listing_id')

    #TODO: add persons data here, if needed. 

    cols_to_keep=['date','rent','bedrooms','bathrooms','sqft','rent_sqft','fips_block','state','region','mpo_id','lng','lat','cars_tot','children_tot','persons_tot','workers_tot','age_of_head_med','income_med','hhs_tot','race_of_head_1','race_of_head_2','race_of_head_3','race_of_head_4','race_of_head_5','race_of_head_6','race_of_head_7','race_of_head_8','race_of_head_9','recent_mover_0','recent_mover_1','tenure_1','tenure_2']
    # this is necessary because some columns may be missing in some states. 
    for col in cols_to_keep: 
        if col not in merged.columns:
            merged[col] = np.nan

    print('Saving data for {s}: {m} rows'.format(s=state,m=len(merged)))
    outfile = 'cl_census_{}.csv'.format(state)
    #merged[cols_to_keep].to_csv(data_dir+outfile, index=True)  # uncomment to save locally

    #write_results_file(merged[cols_to_keep], outfile)  # uncomment to save remotely. 
    return merged[cols_to_keep]

Get data for a single region


In [ ]:
df_bayarea = run_all(state='CA',filters=["region = 'sfbay'","rent>0"])   # define whatever filters you want here.
df_bayarea.head()

In [ ]:
outfile = 'sfbay_listings_03032017'
df_bayarea.to_csv(data_dir+outfile, index=True)  # uncomment to save locally

Process all data by state


In [ ]:
for state in fips_state['USPS']:# uncomment when done with testing. 
    if state != 'DC':   # the DC census data is missing. 
        print('\n Working on',state)
        df_state = run_all(state)

In [ ]:
df_state.head()

In [ ]:
ssh.close()

In [ ]: