Steps:
Can process in batches, one state at a time, or get data by region.
TODO:
In [4]:
import pandas as pd
import psycopg2
import paramiko
import os
import numpy as np
import json
import zipfile
pid | character varying(25) | date | date | region | character varying(50) | neighborhood | character varying(200) | rent | double precision | bedrooms | double precision | sqft | double precision | rent_sqft | double precision | longitude | double precision | latitude | double precision | county | character varying(20) | fips_block | character varying(20) | state | character varying(20) | bathrooms
FIPS code format
53-----033---001701--1--015
[state][county][tract][bg][block]
Note: for DC, county='001'
In [82]:
DATA_DIR=os.path.join('..','data')
"""Path to local data directory"""
#read postgres connection parameters
with open('postgres_settings.json') as settings_file:
settings = json.load(settings_file)
DBNAME = settings['dbname']
USER = settings['user']
HOST = settings['host']
PASSWORD = settings['password']
conn_str = "dbname = {0} user = {1} host = {2} password = {3}".format(DBNAME, USER, HOST, PASSWORD)
try:
conn = psycopg2.connect(conn_str)
cur = conn.cursor()
except:
print ("Cannot connection. Check settings")
In [13]:
# TODO: add putty connection too.
#read SSH connection parameters
with open('ssh_settings.json') as settings_file:
settings = json.load(settings_file)
HOSTNAME = settings['hostname']
USERNAME = settings['username']
PASSWORD = settings['password']
LOCAL_KEY_DIR = settings['local_key_dir']
CENSUS_DIR = 'synthetic_population'
"""Remote directory with census data"""
RESULTS_DIR = 'craigslist_census'
"""Remote directory for results"""
# estbalish SSH connection
ssh = paramiko.SSHClient()
ssh.load_host_keys(LOCAL_KEY_DIR)
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(HOSTNAME,username=USERNAME, password=PASSWORD)
sftp = ssh.open_sftp()
In [14]:
BLOCK_DIR = os.path.join('..','data','urbansim')
BLOCK_ZFILE = 'ba_block_variables.csv.zip'
BLOCK_FILE = 'ba_block_variables.csv'
In [15]:
# make dictionary of states and fips codes.
fips_state = pd.read_csv(os.path.join(DATA_DIR,'state_fips_codes.csv'),dtype=str)
fips2state=dict(zip(fips_state['FIPS'],fips_state['USPS']))
state2fips=dict(zip(fips_state['USPS'],fips_state['FIPS']))
# Make lookup for county to MPO code
mpo_counties = pd.read_csv(os.path.join(DATA_DIR,'us_2015_mpo_regions_counties_v1.csv'), encoding='latin1', dtype={'MPO_ID':str,'COUNTYFP':str,'STFIPS':str})
mpo_counties['COUNTYFP'] = mpo_counties['COUNTYFP'].str.zfill(2)
mpo_counties['st_co_fips'] = mpo_counties['STFIPS']+mpo_counties['COUNTYFP'] # we will want to join on 2-char state + 3-char county fips
county2mpo=dict(zip(mpo_counties['st_co_fips'],mpo_counties['MPO_ID'])) # do we want MPO_ID or do we want GEOID?
mpo_counties.head()
Out[15]:
In [16]:
def run_query(q):
""" Get results given SQL query"""
cur.execute(q)
return(cur.fetchall())
def get_craiglist(filters, split_fips=True):
"""Get craiglist data from database.
Args:
filters (list): list of strings containing filter criteria. Format as individual SQL WHERE statements. E.g., ["region='sandiego'","rent>100"]
split_fips (bool): if True, split fips code into block and fips12 (useful if merging wtih blockgroup)
Returns:
DataFrame: listings data.
"""
#q="SELECT pid,date,rent,bedrooms,bathrooms,sqft,rent_sqft,fips_block,state,region,longitude,latitude FROM rental_listings WHERE state='{}';".format(state)
filters_str = ' AND '.join([x for x in filters])
q="SELECT pid,date,rent,bedrooms,bathrooms,sqft,rent_sqft,fips_block,state,region,longitude,latitude FROM rental_listings WHERE {};".format(filters_str)
results=run_query(q)
df=pd.DataFrame(results,columns=['listing_id', 'date','rent','bedrooms','bathrooms','sqft','rent_sqft','fips_block','state','region','lng','lat'] ) # put it all into a dataframe
if split_fips==True:
# split FIPS into different columns - split off the last 3 chars
df['block']=df.fips_block.str[-4:]
df['fips12']=df.fips_block.str[:-3]
return(df)
def read_census_file(fname):
"""Read census csv file via SFTP and return as dataframe."""
with sftp.open(os.path.join(CENSUS_DIR,fname)) as f:
df = pd.read_csv(f, delimiter=',',dtype={'age_of_head':float, 'block group':str, 'cars':float, 'children':float, 'county':str,
'household_id':str, 'income':float, 'persons':float, 'race_of_head':str, 'recent_mover':str,
'serialno':str, 'state':str, 'tenure':str, 'tract':str, 'workers':float})
return df
def write_results_file(data,fname):
"""Write merged data to csv file via SFTP."""
with sftp.open(os.path.join(RESULTS_DIR,fname),'w') as f:
data.to_csv(f,index=True)
return
def get_census_by_state(state, table='households'):
"""Return all census data for state given two-char abbreviation. Can be 'households' or 'persons' data. """
filelist=sftp.listdir(CENSUS_DIR)
if table=='households':
files = [f for f in filelist if f[:5]=='hh_{}'.format(state)]
elif table=='persons':
files = [f for f in filelist if f[:4]=='p_{}'.format(state)]
#files = files[:3] # uncomment this line for testing.
new_df = pd.DataFrame()
for f in files:
df = read_census_file(f)
new_df = pd.concat([new_df,df])
return(new_df)
def strip_zeros(s):
"""Remove '.0 from end of string"""
if s.endswith('.0'):
return(s[:-2])
else:
return(s)
def format_hh_data(df):
"""Fix formatting for hhs census data. Replace '' strings with zero. Format other strings."""
df['county'] = df['county'].str.zfill(2) # make county 3-char string.
for col in ['children','workers']:
df[col] = df[col].replace('','0')
for col in ['race_of_head','recent_mover','tenure']:
df[col] = df[col].astype(str)
df[col] = df[col].map(strip_zeros) # make sure strings are formatted.
return(df)
def aggregate_census(df, groupby_cols=['county','tract','block group'],cols_to_sum=['cars','children','persons','workers'], cols_to_median=['age_of_head','income'],categ_cols=['race_of_head','recent_mover','tenure'],id_col='serialno',table='hhs'):
"""Aggregate census table to block group. Made this for hh data, may need to revised for persons data.
Args:
groupby_cols (list): names of columns to group by (default=['county','tract','block group'])
cols_to_sum (list): names of columns for which to compute totals.
cols_to_median (list): names of columns for which to compute medians
categ_cols (list): names of categorical columns
id_col (str): name of column that serves as the id column, to use in counting rows.
table (str): 'hhs' (default) or 'per'
Returns:
DataFrame: aggregated data.
"""
# For some columns we'll want to find the sum or average/median. These will need only a simple groupby
sums = df.groupby(by=groupby_cols).sum()[cols_to_sum]
sums.columns = [x+'_tot' for x in cols_to_sum]
medians = df.groupby(by=groupby_cols).median()[cols_to_median]
medians.columns = [x+'_med' for x in cols_to_median]
counts = pd.DataFrame(df.groupby(by=groupby_cols).count()[id_col])
counts.columns=[table+'_tot']
# Categorical columns will need pivot tables.
categoricals = pd.DataFrame(index=counts.index)
for col in categ_cols:
pivoted=df.pivot_table(index = groupby_cols, columns = col, aggfunc='count')[id_col]
pivoted.columns = [col+'_'+x for x in pivoted.columns]
pivoted.columns = pivoted.columns.map(strip_zeros)
# merge back together
categoricals = pd.merge(categoricals, pivoted, left_index=True, right_index=True)
# put all back together in one table
merged = pd.merge(sums, medians, left_index=True, right_index=True)
merged = pd.merge(merged, counts, left_index=True, right_index=True)
merged = pd.merge(merged, categoricals, left_index=True, right_index=True)
# check lengths of dataframes to detect any problems in grouping or merging
lengths = [len(sums),len(medians),len(counts),len(categoricals),len(merged)]
if len(set(lengths))>1:
print('Warning: Aggregated tables have different lengths.',lengths,'for sums, medians, counts, categoricals, and merged.')
return(merged)
def match_mpo(s, mpo_dict=county2mpo):
"""Match a 5-char state-county FIPS code to an MPO code
Args:
s (str): 5-char state-county string
mpo_dict (dict): county2mpo dictionary
Returns:
str: MPO code
"""
try:
return mpo_dict[s]
except KeyError: # in this case, the county is not in an MPO
return ''
In [10]:
def run_all(state, filters=None):
"""Get craigslist data and merge with census data, by state, and save. with additional filters if needed.
Args:
state (str): 2-char state abbreviation
filters (list): additional filters. Do not need to include state in filter list
"""
# load and prepare craiglist data
# If filters are provided, use them to filter data
if filters:
filters.append("state='{}'".format(state))
print(filters)
df_cl=get_craiglist(filters)
# If no filters provided, get all data for the specified state.
else:
df_cl=get_craiglist(["state='{}'".format(state)])
df_cl['st_co_fps'] = df_cl.fips_block.map(lambda x: x[:5])
df_cl['mpo_id'] = df_cl.st_co_fps.map(match_mpo)
# load and prepare census data for households
hhs = get_census_by_state(state, table='households')
hhs = format_hh_data(hhs)
hhs_bg = aggregate_census(hhs)
hhs_bg=hhs_bg.reset_index()
hhs_bg['fips12']=state2fips[state]+hhs_bg['county']+hhs_bg['tract']+hhs_bg['block group'] # create 12-digit FIPS code for merging.
# merge with craigslist data.
merged = pd.merge(df_cl, hhs_bg, on='fips12',how='left')
merged = merged.set_index('listing_id')
#TODO: add persons data here, if needed.
# Keep only columns we'll need.
cols_to_keep=['date','rent','bedrooms','bathrooms','sqft','rent_sqft','fips_block','state','region','mpo_id','lng','lat','cars_tot','children_tot','persons_tot','workers_tot','age_of_head_med','income_med','hhs_tot','race_of_head_1','race_of_head_2','race_of_head_3','race_of_head_4','race_of_head_5','race_of_head_6','race_of_head_7','race_of_head_8','race_of_head_9','recent_mover_0','recent_mover_1','tenure_1','tenure_2']
# This is a bit of a hack in case some columns are missing in some states.
for col in cols_to_keep:
if col not in merged.columns:
merged[col] = np.nan
# save file either locally or remotely.
print('Saving data for {s}: {m} rows'.format(s=state,m=len(merged)))
outfile = 'cl_census_{}.csv'.format(state)
#merged[cols_to_keep].to_csv(os.path.join(DATA_DIR,outfile), index=True) # uncomment to save locally
#write_results_file(merged[cols_to_keep], outfile) # uncomment to save remotely.
return merged[cols_to_keep]
In [11]:
df_bayarea = run_all(state='CA',filters=["region = 'sfbay'","rent>0"]) # define whatever filters you want here.
df_bayarea.head()
In [ ]:
# save locally
outfile = 'sfbay_listings_04282017.csv'
df_bayarea.to_csv(os.path.join(DATA_DIR,outfile), index=True)
In [ ]:
for state in fips_state['USPS']:# uncomment when done with testing.
if state != 'DC': # the DC census data is missing.
print('\n Working on',state)
df_state = run_all(state)
In [ ]:
df_state.head()
In [ ]:
ssh.close()
In [17]:
# first unzip csv file into temp dir
os.mkdir('temp') # make temp dir for unzipped files
zip_ref = zipfile.ZipFile(os.path.join(BLOCK_DIR,BLOCK_ZFILE), 'r')
zip_ref.extractall('temp')
zip_ref.close()
In [18]:
# temporarily read first 100 lines just to see header names
df_temp = pd.read_csv(os.path.join('temp',BLOCK_FILE), nrows=100, dtype={'block_id':str})
print(df_temp.shape)
df_temp.head()
Out[18]:
In [19]:
# define the columns we need
block_cols = df_temp.columns
In [26]:
#df_temp.columns[:100]
Certain columns are definitely not useful. E.g, 'puma10_id_is_0609502', 'puma10_id_is_0609503' 'tracts_mean_y', 'tracts_mean_x'
TODO: spend more time choosing features
In [21]:
# make sure to now include unneeded columns like these:
unneeded_cols = [x for x in block_cols if x.startswith('puma10_id_is_')]+
[x for x in block_cols if (x.endswith('mean_y'))|(x.endswith('mean_x'))]+
[x for x in block_cols if (x.endswith('std_y'))|(x.endswith('std_x'))]+
[x for x in block_cols if x.startswith('pumas_prop_sector_id')]+
[x for x in block_cols if x.startswith('county_id_is_')]+
[x for x in block_cols if x.startswith('tracts_prop_sector_id')]+
[x for x in block_cols if x.startswith('counties_prop_sector_id')]
In [ ]:
len(unneeded_cols)
In [33]:
cols_to_use = ['block_id','nodes_population_1500m','block_groups_total_jobs', 'block_groups_median_children',
'block_groups_median_income', 'prop_tenure_1', 'nodes_low_income_hh_1500m', 'nodes_high_income_hh_1500m',
'nodes_jobs_3000m','nodes_jobs_20km', 'nodes_population_400m', 'nodes_population_800m',
'block_groups_prop_race_of_head_1','block_groups_prop_race_of_head_2', 'block_groups_prop_race_of_head_3',
'block_groups_prop_race_of_head_7','block_groups_prop_race_of_head_8','block_groups_prop_race_of_head_6',
'pumas_density_residential_units','block_groups_density_jobs',
'nodes_jobs_1500m_4445','nodes_jobs_3000m_4445',
'nodes_du_5000m','nodes_du_800m','block_groups_median_rent',
'block_groups_median_persons', 'block_groups_median_age_of_head', 'nodes_ave_year_built_800m']
In [34]:
for col in cols_to_use:
if col not in block_cols:
print(col)
In [35]:
# Read all rows, using only the columns we want
df_blocks = pd.read_csv(os.path.join('temp',BLOCK_FILE),dtype={'block_id':str}, usecols = cols_to_use)
print(df_blocks.shape)
df_blocks.head()
Out[35]:
In [36]:
df_blocks['block_id'].head()
Out[36]:
In [37]:
df_listings = get_craiglist(filters = ["region='sfbay'","rent>100"])
In [38]:
# merge listings with vars on block_id
df_listings.fips_block.head()
print(len(df_listings), len(df_blocks))
df_merged = pd.merge(df_listings, df_blocks, left_on='fips_block', right_on='block_id', how='inner')
if len(df_merged)<len(df_listings):
print('Warning: only {0} of {1} rows matched'.format(len(df_merged), len(df_listings)))
In [67]:
# save to hdf
outfile = 'ba_listings.h5'
df_merged.to_hdf(os.path.join(DATA_DIR,outfile),'merged')
outfile = 'ba_listings.csv'
df_merged.to_csv(os.path.join(DATA_DIR,outfile),index=False)
We need to create a database table to hold the block-level variables for the rent-predictor app.
Steps:
Assume all features are floats unless otherwise specified.
(If have a lot that aren't floats, might want to store the variable names and their types as a json so we can refer back to it when we change the variables.)
Copy data into table. COPY is the fastest way to insert a large amount of data (https://www.postgresql.org/docs/current/static/populate.html)
In [90]:
# connect to local databse
DBNAME = settings['DBNAME_RENT']
# USER = settings['USER_RENT']
# PASSWORD = settings['PASSWORD_RENT']
conn_str = "dbname = {0}".format(DBNAME)
try:
conn = psycopg2.connect(conn_str)
cur = conn.cursor()
except:
print ("Cannot connection. Check settings")
In [54]:
# first save data as csv.
# let's use that temp dir again
FULL_PATH = '/Users/lisarayle/Dropbox/craigslist/src/' # can't use relative path in postgres, I guess
csvfile = 'blocks_temp.csv'
df_blocks.to_csv(os.path.join('temp',csvfile), index=False)
table_name = 'block_vars'
In [91]:
def create_db_table(col_names,t_name,id_var='block_id'):
"""Create a new table with schema to hold the block-level data.
Args:
col_names (list): list of names of columns to use. First one can be 'block id'
t_name (str): name of database table
id_var (str): name of id variable (default: 'block_id')
"""
# drop table if already exists
q = "DROP TABLE IF EXISTS {}".format(t_name)
cur.execute(q)
conn.commit()
# build the SQL string
sql_begin = "CREATE TABLE {0} (id BIGSERIAL PRIMARY KEY, {1} varchar(15) not null, ".format(t_name, id_var)
if col_names[0]==id_var:
sql_middle = " real,".join([c for c in col_names[1:]]) # leave off block id if it's there.
else:
sql_middle = " real,".join([c for c in col_names])
sql_end = " real);"
q = sql_begin+sql_middle+sql_end
cur.execute(q)
conn.commit()
return
def copy_block_data(col_names,t_name,fname):
"""Copy data from csv file into block variables table.
Args:
col_names (list): list of names of columns to use. First one can be 'block id'
t_name (str): name of database table
fname (str): name of csv file with data
"""
var_string = ','.join([c for c in col_names])
q="COPY {t}({v}) FROM '{f}' DELIMITERS ',' CSV HEADER;".format(t=t_name,v=var_string, f=os.path.join(FULL_PATH,'temp',fname))
print(q)
cur.execute(q)
conn.commit()
return
In [92]:
create_db_table(cols_to_use, table_name)
copy_block_data(cols_to_use, table_name, csvfile)
In [57]:
# test queries
q = "select count(*) from block_vars;"
run_query(q)
q = "select column_name from information_schema.columns where table_name='block_vars';"
run_query(q)
q = "select * from block_vars limit 10;"
run_query(q)
Out[57]:
In [72]:
cols_to_use
Out[72]:
In [78]:
DATA_DIR='/Users/lisarayle/rent_predictor/data/'
"""Path to data directory"""
# read file with variable names
infile = 'variables.txt'
with open(os.path.join(DATA_DIR, infile), 'r') as f:
VARLIST = f.read().split(',')
print(VARLIST)
len(VARLIST)
Out[78]:
In [ ]: