CSE 6040, Fall 2015 [10-B]: CSV to DB

This notebook is optional and was not covered in class. It shows how we converted the raw comma-separated values (CSV) file corresponding to the NYC 311 Complaints database into an SQLite database. It is adapted from the code that appears here.

Setup: sqlalchemy and pandas


In [ ]:
from sqlalchemy import create_engine # database connection
import datetime as dt # for timing
import pandas as pd # for data frames

Input (CSV) filename


In [ ]:
#CSV_FILE = 'NYC-311-2M.csv'
CSV_FILE = None

Determine output (SQLite DB) filename from the input filename


In [ ]:
assert CSV_FILE

import re
CSV_BASES = re.findall (r'(.*)\.csv$', CSV_FILE, re.I)

assert len (CSV_BASES) >= 1
CSV_BASE = CSV_BASES[0]

DB_FILE = "%s.db" % CSV_BASE

print ("Converting: %s to %s (an SQLite DB) ..." % (CSV_FILE, DB_FILE))

Connect to an SQL data source


In [ ]:
disk_engine = create_engine ('sqlite:///%s' % DB_FILE)

Convert CSV to SQLite DB

  1. Load the CSV, chunk-by-chunk, into a DataFrame
  2. Process each chunk by doing some minimal data normalization and stripping out uninteresting columns
  3. Append each chunk to the SQLite database

In [ ]:
# Convert .csv to .db

CHUNKSIZE = 25000 # Number of rows to read at a time

# List of columns to keep
KEEP_COLS = ['Agency', 'CreatedDate', 'ClosedDate', 'ComplaintType',
             'Descriptor', 'CreatedDate', 'ClosedDate',
             'TimeToCompletion', 'City']

start = dt.datetime.now () # start timer

j = 0
index_start = 1
for df in pd.read_csv(CSV_FILE, chunksize=CHUNKSIZE, iterator=True, encoding='utf-8'):
    
    # Remove spaces from columns
    df = df.rename(columns={c: c.replace(' ', '') for c in df.columns})

    # Convert to proper date+timestamps
    df['CreatedDate'] = pd.to_datetime(df['CreatedDate'])
    df['ClosedDate'] = pd.to_datetime(df['ClosedDate'])

    df.index += index_start

    for c in df.columns:
        if c not in KEEP_COLS:
            df = df.drop (c, axis=1)
    
    j += 1
    print ('{} seconds: completed {} rows'.format((dt.datetime.now() - start).seconds, j*chunksize))

    df.to_sql('data', disk_engine, if_exists='append')
    index_start = df.index[-1] + 1