This notebook downloads zipped CEMS data from the EPA ftp server, extracts the data, corrects column names, combines the data, and exports one data file per year.
This code is the least complete out of all sections. The parts that identify new ftp files for download (using already_downloaded
for existing files and ftp server upload dates) may require re-writing.
Because not all data needs to be re-downloaded for every update, check to make sure that the correct years are used in loops for downloading and processing data.
In [17]:
import io, time, json
import pandas as pd
import urllib#, urllib2
import re
import os
from os.path import join
import numpy as np
import ftplib
from ftplib import FTP
import timeit
import sys
from joblib import Parallel, delayed
cwd = os.getcwd()
data_path = join(cwd, '..', 'Data storage')
In [3]:
%load_ext watermark
%watermark -iv -v
In [4]:
# Load the "autoreload" extension
%load_ext autoreload
# always reload modules marked with "%aimport"
%autoreload 1
# add the 'src' directory as one where we can import modules
src_dir = join(os.getcwd(), os.pardir, 'src')
sys.path.append(src_dir)
In [23]:
%aimport Data.data_extraction
from Data.data_extraction import import_clean_epa
This downloads hourly data from the ftp server over a range of years, and saves all of the file names/last update times in a list. The downloads can take some time depending on how much data is being retrieved.
Some of the code below assumes that we only need to retrieve new or modified files. If you are retrieving this data for the first time, create an empty dataframe named already_downloaded
with column names file name
and last updated
.
In [6]:
# Replace the filename with whatever csv stores already downloaded file info
# path = join('EPA downloads', 'name_time 2015-2016.csv')
# already_downloaded = pd.read_csv(path, parse_dates=['last updated'])
# Uncomment the line below to create an empty dataframe
already_downloaded = pd.DataFrame(columns=['file name', 'last updated'])
In [7]:
already_downloaded.head()
Out[7]:
In [12]:
# Years of data to download
years = [2017]
# Make the EPA download directory if it doesn't exist
try:
os.mkdir(join(data_path, 'EPA downloads'))
except:
pass
# Timestamp
start_time = timeit.default_timer()
name_time_list = []
# Open ftp connection and navigate to the correct folder
print('Opening ftp connection')
ftp = FTP('newftp.epa.gov')
ftp.login()
ftp.cwd('/dmdnload/emissions/hourly/monthly')
for year in years:
print(year)
year_str = str(year)
print('Change directory to {}'.format(year_str))
try:
ftp.cwd(year_str)
except ftplib.all_errors as e:
print(e)
break
# Use ftplib to get the list of filenames
print('Fetch filenames')
fnames = ftp.nlst()
# Create new directory path if it doesn't exist
new_path = join(data_path, 'EPA downloads', year_str)
try:
os.mkdir(new_path)
print('New local folder created for {}'.format(year_str))
except:
pass
# Look for files without _HLD in the name
name_list = []
time_list = []
print('Find filenames without _HLD and time last updated')
for name in fnames:
if '_HLD' not in name:
try:
# The ftp command "MDTM" asks what time a file was last modified
# It returns a code and the date/time
# If the file name isn't already downloaded, or the time isn't the same
tm = ftp.sendcmd('MDTM '+ name).split()[-1]
dt = pd.to_datetime(tm[:8], format='%Y%m%d')
if name not in already_downloaded['file name'].values:
time_list.append(dt)
name_list.append(name)
elif (already_downloaded
.loc[already_downloaded['file name']==name,
'last updated'].values[0] != tm):
tm = ftp.sendcmd('MDTM '+ name).split()[-1]
dt = pd.to_datetime(tm[:8], format='%Y%m%d')
time_list.append(dt)
name_list.append(name)
except ftplib.all_errors as e:
print(e)
# If ftp.sendcmd didn't work, assume the connection was lost
ftp = FTP('newftp.epa.gov')
ftp.login()
ftp.cwd('/dmdnload/emissions/hourly/monthly')
ftp.cwd(year_str)
# tm = ftp.sendcmd('MDTM '+ name)
# time_list.append(pd.to_datetime(tm.split()[-1]))
name_list.append(name)
# Store all filenames and update times
print('Store names and update times')
name_time_list.extend(zip(name_list, time_list))
# Download and store data
print('Downloading data')
for name in name_list:
try:
with open(join(data_path, 'EPA downloads', year_str, name),
'wb') as f:
ftp.retrbinary('RETR %s' % name, f.write)
except ftplib.all_errors as e:
print(e)
try:
ftp.quit()
except ftplib.all_errors as e:
print(e)
pass
ftp = FTP('newftp.epa.gov')
ftp.login()
ftp.cwd('/dmdnload/emissions/hourly/monthly')
ftp.cwd(year_str)
with open(join(data_path, 'EPA downloads', year_str, name),
'wb') as f:
ftp.retrbinary('RETR %s' % name, f.write)
print('Download finished')
print(round((timeit.default_timer() - start_time)/60.0,2), 'min so far')
# Go back up a level on the ftp server
ftp.cwd('..')
# Timestamp
elapsed = round((timeit.default_timer() - start_time)/60.0,2)
In [19]:
print 'Data download completed in %s mins' %(elapsed)
In [9]:
name_time_df = pd.DataFrame(name_time_list, columns=['file name', 'last updated'])
In [10]:
name_time_df.head()
Out[10]:
In [21]:
path = join(data_path, 'EPA downloads', 'name_time example.csv')
name_time_df.to_csv(path, index=False)
In [15]:
import csv
import zipfile
from io import BytesIO
from collections import Counter
This takes ~100 ms per file.
In [18]:
base_path = join(data_path, 'EPA downloads')
num_cols = {}
col_names = {}
for year in range(2001, 2018):
n_cols_list = []
col_name_list = []
path = join(base_path, str(year))
fnames = os.listdir(path)
for name in fnames:
fullpath = os.path.join(path, name)
columns = pd.read_csv(fullpath, nrows=5).columns
# Add the column names to the large list
col_name_list.extend(columns)
# Add the number of columns to the list
n_cols_list.append(len(columns))
col_names[year] = Counter(col_name_list)
num_cols[year] = Counter(n_cols_list)
From the table below, recent years always have the units after an emission name. Before 2009 some files have the units and some don't. UNITID is consistent through all years, but UNIT_ID was added in after 2008 (not the same thing).
In [19]:
pd.DataFrame(col_names)
Out[19]:
In [26]:
pd.DataFrame(col_names).index
Out[26]:
In [25]:
pd.DataFrame(num_cols)
Out[25]:
Using joblib for this.
Joblib on windows requires the if name == 'main': statement. And in a Jupyter notebook the function needs to be imported from an external script. I probably should have done the parallel part at a higher level - the longest part is saving the csv files. Could use this method - disable a check - to speed up the process.
Joblib has to be at least version 10.0, which is only available through pip - got some errors when using the version installed by conda.
Create a dictionary mapping column names. Any values on the left (keys) should be replaced by values on the right (values).
In [20]:
col_name_map = {'CO2_MASS' : 'CO2_MASS (tons)',
'CO2_RATE' : 'CO2_RATE (tons/mmBtu)',
'GLOAD' : 'GLOAD (MW)',
'HEAT_INPUT' : 'HEAT_INPUT (mmBtu)',
'NOX_MASS' : 'NOX_MASS (lbs)',
'NOX_RATE' : 'NOX_RATE (lbs/mmBtu)',
'SLOAD' : 'SLOAD (1000lb/hr)',
'SLOAD (1000 lbs)' : 'SLOAD (1000lb/hr)',
'SO2_MASS' : 'SO2_MASS (lbs)',
'SO2_RATE' : 'SO2_RATE (lbs/mmBtu)'
}
Be sure to change the start and end years
In [26]:
years = [2016, 2017]
if __name__ == '__main__':
start_time = timeit.default_timer()
base_path = join(data_path, 'EPA downloads')
for year in years:
print('Starting', str(year))
df_list = []
path = join(base_path, str(year))
fnames = os.listdir(path)
df_list = Parallel(n_jobs=-1)(delayed(import_clean_epa)
(path, name, col_name_map)
for name in fnames)
print('Combining data')
df = pd.concat(df_list)
print('Saving file')
path_out = join(data_path, 'EPA emissions',
'EPA emissions {}.csv'.format(str(year)))
df.to_csv(path_out, index=False)
# File IO is much faster with the feather file format
# df.to_feather(path_out)
print(round((timeit.default_timer() - start_time)/60.0,2), 'min so far')
# Timestamp
elapsed = round((timeit.default_timer() - start_time)/60.0,2)
In [ ]: