In [39]:
import os
DATA_FILE_CSV = "beatles-diskography.csv"
def parse_file(data_file):
data = []
row_count = 0
with open(data_file) as f:
header = f.readline().split(',')
for line in f:
if row_count >= 10:
break
fields = line.strip().split(',')
row = {}
for i, value in enumerate(fields):
row[header[i].strip()] = value.strip()
data.append(row)
row_count += 1
return data
In [40]:
d = parse_file(DATA_FILE_CSV)
d[0]
Out[40]:
In [41]:
def test(data):
assert data[0] == {'BPI Certification': 'Gold',
'Label': 'Parlophone(UK)',
'RIAA Certification': 'Platinum',
'Released': '22 March 1963',
'Title': 'Please Please Me',
'UK Chart Position': '1',
'US Chart Position': '\xe2\x80\x94'}
assert data[9] == {'BPI Certification': 'Gold',
'Label': 'Parlophone(UK)',
'RIAA Certification': '',
'Released': '10 July 1964',
'Title': '',
'UK Chart Position': '1',
'US Chart Position': '\xe2\x80\x94'}
In [42]:
test(d)
But there are many small things that will cause us problems if we try and write the CSV reader by ourselves. So we will re write the above using python's csv module
In [43]:
import csv
def parse_csv(data_file):
data = []
with open(data_file, 'rb') as sd:
r = csv.DictReader(sd)
for line in r:
data.append(line)
return data
In [44]:
test(parse_csv(DATA_FILE_CSV))
In [45]:
def read_sheet(sheet):
return [[sheet.cell_value(r, col)
for col in range(sheet.ncols)]
for r in range(sheet.nrows)]
In [46]:
import xlrd
DATA_FILE_EXCEL = "2013_ERCOT_Hourly_Load_Data.xls"
def parse_excel_file(datafile):
workbook = xlrd.open_workbook(datafile)
sheet = workbook.sheet_by_index(0)
data = read_sheet(sheet)
print "\nList Comprehension"
print "data[3][2]:",
print data[3][2]
print "\nCells in a nested loop:"
for row in range(sheet.nrows):
for col in range(sheet.ncols):
if row == 50:
print sheet.cell_value(row, col),
### other useful methods:
print "\nROWS, COLUMNS, and CELLS:"
print "Number of rows in the sheet:",
print sheet.nrows
print "Type of data in cell (row 3, col 2):",
print sheet.cell_type(3, 2)
print "Value in cell (row 3, col 2):",
print sheet.cell_value(3, 2)
print "Get a slice of values in column 3, from rows 1-3:"
print sheet.col_values(3, start_rowx=1, end_rowx=4)
print "\nDATES:"
print "Type of data in cell (row 1, col 0):",
print sheet.cell_type(1, 0)
exceltime = sheet.cell_value(1, 0)
print "Time in Excel format:",
print exceltime
print "Convert time to a Python datetime tuple, from the Excel float:",
print xlrd.xldate_as_tuple(exceltime, 0)
return data
In [47]:
data = parse_excel_file(DATA_FILE_EXCEL)
data[0:2]
Out[47]:
In [48]:
def data_for_column(sheet, column_index):
return sheet.col_values(column_index, start_rowx=1, end_rowx=None)
def row_index_for_value_in_column(data, value):
return data.index(value) + 1
def cell_value_at_position(sheet, row, col):
return sheet.cell_value(row, col)
def parse_excel_date(excel_date):
return xlrd.xldate_as_tuple(excel_date, 0)
def get_date_for_row_containing_value(sheet, column_data, value):
index = row_index_for_value_in_column(column_data, value)
date = cell_value_at_position(sheet, index, 0)
result = parse_excel_date(date)
return result
In [49]:
def parse_file_13(datafile):
workbook = xlrd.open_workbook(datafile)
sheet = workbook.sheet_by_index(0)
sheet_data = read_sheet(sheet)
cv = data_for_column(sheet, 1)
max_data = max(cv)
min_data = min(cv)
#print sheet_data
data = {
'maxtime': get_date_for_row_containing_value(sheet, cv, max_data),
'maxvalue': max_data,
'mintime': get_date_for_row_containing_value(sheet, cv, min_data),
'minvalue': min_data,
'avgcoast': sum(cv) / float(len(cv))
}
return data
In [50]:
import pprint
data = parse_file_13(DATA_FILE_EXCEL)
pprint.pprint(data)
assert data['maxtime'] == (2013, 8, 13, 17, 0, 0)
assert round(data['maxvalue'], 10) == round(18779.02551, 10)
In [51]:
import json
import requests
BASE_URL = "http://musicbrainz.org/ws/2/"
ARTIST_URL = BASE_URL + "artist/"
# query parameters are given to the requests.get function as a dictionary; this
# variable contains some starter parameters.
query_type = { "simple": {},
"atr": {"inc": "aliases+tags+ratings"},
"aliases": {"inc": "aliases"},
"releases": {"inc": "releases"}}
def query_site(url, params, uid="", fmt="json"):
# This is the main function for making queries to the musicbrainz API.
# A json document should be returned by the query.
params["fmt"] = fmt
r = requests.get(url + uid, params=params)
print "requesting", r.url
if r.status_code == requests.codes.ok:
return r.json()
else:
r.raise_for_status()
def query_by_name(url, params, name):
# This adds an artist name to the query parameters before making
# an API call to the function above.
params["query"] = "artist:" + name
return query_site(url, params)
def pretty_print(data, indent=4):
# After we get our output, we can format it to be more readable
# by using this function.
if type(data) == dict:
print json.dumps(data, indent=indent, sort_keys=True)
else:
print data
In [52]:
def json_play():
'''
Modify the function calls and indexing below to answer the questions on
the next quiz. HINT: Note how the output we get from the site is a
multi-level JSON document, so try making print statements to step through
the structure one level at a time or copy the output to a separate output
file.
'''
results = query_by_name(ARTIST_URL, query_type["simple"], "Nirvana")
print "All Results for Nirvana"
pretty_print(results)
artist_id = results["artists"][1]["id"]
print "\nARTIST:"
pretty_print(results["artists"][1])
artist_data = query_site(ARTIST_URL, query_type["releases"], artist_id)
releases = artist_data["releases"]
print "\nONE RELEASE:"
pretty_print(releases[0], indent=2)
release_titles = [r["title"] for r in releases]
print "\nALL TITLES:"
for t in release_titles:
print t
In [53]:
json_play()
In [54]:
def is_group(artist):
return 'type' in artist and artist['type'].lower() == 'group'
def has_same_name(artist, name):
return artist['name'].lower() == name.lower()
def band_by_name(name):
results = query_by_name(ARTIST_URL, query_type["simple"], name)
return filter(lambda x: is_group(x) and has_same_name(x, name), results['artists'])
In [55]:
#number of bands with the name
len(band_by_name("FIRST AID KIT"))
Out[55]:
In [56]:
#Name of Queen's begin area name
band_by_name("queen")[0]['begin-area']['name']
Out[56]:
In [57]:
#Spanish alias for the beatles
all_aliases = band_by_name('the beatles')[0]['aliases']
filter(lambda x: x['locale'] == 'es', all_aliases)[0]['name']
Out[57]:
In [58]:
#disambiguation for nirvana
filter(lambda x: x['country'] == 'US',band_by_name('nirvana'))[0]['disambiguation']
Out[58]:
In [59]:
#When was one direction formed?
band_by_name('one direction')[0]['life-span']['begin']
Out[59]:
Your task is to process the supplied file and use the csv module to extract data from it. The data comes from NREL (National Renewable Energy Laboratory) website. Each file contains information from one meteorological station, in particular - about amount of solar and wind energy for each hour of day.
Note that the first line of the datafile is neither data entry, nor header. It is a line describing the data source. You should extract the name of the station from it.
The data should be returned as a list of lists (not dictionaries).
You can use the csv modules reader method to get data in such format.
Another useful method is next() - to get the next line from the iterator.
You should only change the parse_file function.
Data comes from NREL website. The datafile in this exercise is a small subset from the full file for one of the stations. You can download it from the Downloadables section > or see the full data files for other stations on the National Solar Radiation Data Base.
In [60]:
import csv
import os
DATA_DIR = ""
DATA_FILE = "745090.csv"
def parse_file(datafile):
name = ""
data = []
with open(datafile, 'rb') as f:
reader = csv.reader(f)
for i, row in enumerate(reader):
if i == 0:
name = row[1]
elif i == 1:
pass
else:
data.append(row)
# Do not change the line below
return name, data
In [61]:
def test1():
datafile = os.path.join(DATA_DIR, DATA_FILE)
name, data = parse_file(datafile)
assert name == "MOUNTAIN VIEW MOFFETT FLD NAS"
assert data[0][1] == "01:00"
assert data[2][0] == "01/01/2005"
assert data[2][5] == "2"
In [62]:
test1()
Find the time and value of max load for each of the regions COAST, EAST, FAR_WEST, NORTH, NORTH_C, SOUTHERN, SOUTH_C, WEST and write the result out in a csv file, using pipe character | as the delimiter.
An example output can be seen in the "example.csv" file.
See csv module documentation on how to use different delimeters for csv.writer- http://docs.python.org/2/library/csv.html
In [63]:
import xlrd
import os
import csv
DATA_FILE = "2013_ERCOT_Hourly_Load_Data.xls"
OUT_FILE = "2013_Max_Loads.csv"
def get_max_and_max_date_for_column(sheet, column_index):
data = data_for_column(sheet, column_index)
max_data = max(data)
date = get_date_for_row_containing_value(sheet, data, max_data)
return max_data, date
def parse_file(datafile):
workbook = xlrd.open_workbook(datafile)
sheet = workbook.sheet_by_index(0)
return {
'COAST': get_max_and_max_date_for_column(sheet, 1),
'EAST': get_max_and_max_date_for_column(sheet, 2),
'FAR_WEST': get_max_and_max_date_for_column(sheet, 3),
'NORTH': get_max_and_max_date_for_column(sheet, 4),
'NORTH_C': get_max_and_max_date_for_column(sheet, 5),
'SOUTHERN': get_max_and_max_date_for_column(sheet, 6),
'SOUTH_C': get_max_and_max_date_for_column(sheet, 7),
'WEST': get_max_and_max_date_for_column(sheet, 8)
}
def save_file(data, filename):
result = ""
with open(filename, 'w') as f:
result += "Station|Year|Month|Day|Hour|Max Load\n"
for key, value in data.iteritems():
result += "{}|{}|{}|{}|{}|{}\n".format(
key, value[1][0], value[1][1], value[1][2], value[1][3], value[0])
result = result.strip("\n")
f.write(result)
In [64]:
def test2():
# open_zip(DATA_FILE)
data = parse_file(DATA_FILE)
save_file(data, OUT_FILE)
number_of_rows = 0
stations = []
ans = {'FAR_WEST': {'Max Load': '2281.2722140000024',
'Year': '2013',
'Month': '6',
'Day': '26',
'Hour': '17'}}
correct_stations = ['COAST', 'EAST', 'FAR_WEST', 'NORTH',
'NORTH_C', 'SOUTHERN', 'SOUTH_C', 'WEST']
fields = ['Year', 'Month', 'Day', 'Hour', 'Max Load']
with open(OUT_FILE) as of:
csvfile = csv.DictReader(of, delimiter="|")
for line in csvfile:
station = line['Station']
if station == 'FAR_WEST':
for field in fields:
# Check if 'Max Load' is within .1 of answer
if field == 'Max Load':
max_answer = round(float(ans[station][field]), 1)
max_line = round(float(line[field]), 1)
assert max_answer == max_line
# Otherwise check for equality
else:
assert ans[station][field] == line[field]
number_of_rows += 1
stations.append(station)
# Output should be 8 lines not including header
assert number_of_rows == 8
# Check Station Names
assert set(stations) == set(correct_stations)
In [65]:
test2()
This exercise shows some important concepts that you should be aware about:
To run this code locally you have to register at the NYTimes developer site and get your own API key. You will be able to complete this exercise in our UI without doing so, as we have provided a sample result.
Your task is to process the saved file that represents the most popular articles (by view count) from the last day, and return the following data:
All your changes should be in the article_overview function.
The rest of functions are provided for your convenience, if you want to access
the API by yourself.
If you want to know more, or query the site by yourself, please read the NYTimes Developer Documentation for the Most Popular API and apply for your own API Key for NY Times.
In [66]:
import json
import codecs
import requests
URL_MAIN = "http://api.nytimes.com/svc/"
URL_POPULAR = URL_MAIN + "mostpopular/v2/"
API_KEY = { "popular": "",
"article": ""}
def get_from_file(kind, period):
filename = "popular-{0}-{1}.json".format(kind, period)
with open(filename, "r") as f:
return json.loads(f.read())
def article_overview(kind, period):
data = get_from_file(kind, period)
titles = []
urls = []
for row in data:
titles.append({row['section']: row['title']})
for media in row['media']:
for metadata in media['media-metadata']:
if metadata['format'] == 'Standard Thumbnail':
urls.append(metadata['url'])
return titles, urls
def query_site(url, target, offset):
# This will set up the query with the API key and offset
# Web services often use offset paramter to return data in small chunks
# NYTimes returns 20 articles per request, if you want the next 20
# You have to provide the offset parameter
if API_KEY["popular"] == "" or API_KEY["article"] == "":
print "You need to register for NYTimes Developer account to run this program."
print "See Intructor notes for information"
return False
params = {"api-key": API_KEY[target], "offset": offset}
r = requests.get(url, params = params)
if r.status_code == requests.codes.ok:
return r.json()
else:
r.raise_for_status()
def get_popular(url, kind, days, section="all-sections", offset=0):
# This function will construct the query according to the requirements of the site
# and return the data, or print an error message if called incorrectly
if days not in [1, 7, 30]:
print "Time period can be 1,7, 30 days only"
return False
if kind not in ["viewed", "shared", "emailed"]:
print "kind can be only one of viewed/shared/emailed"
return False
url += "most{0}/{1}/{2}.json".format(kind, section, days)
data = query_site(url, "popular", offset)
return data
def save_file(kind, period):
# This will process all results, by calling the API repeatedly with supplied offset value,
# combine the data and then write all results in a file.
data = get_popular(URL_POPULAR, "viewed", 1)
num_results = data["num_results"]
full_data = []
with codecs.open("popular-{0}-{1}.json".format(kind, period), encoding='utf-8', mode='w') as v:
for offset in range(0, num_results, 20):
data = get_popular(URL_POPULAR, kind, period, offset=offset)
full_data += data["results"]
v.write(json.dumps(full_data, indent=2))
In [67]:
def test3():
titles, urls = article_overview("viewed", 1)
assert len(titles) == 20
assert len(urls) == 30
assert titles[2] == {'Opinion': 'Professors, We Need You!'}
assert urls[20] == 'http://graphics8.nytimes.com/images/2014/02/17/sports/ICEDANCE/ICEDANCE-thumbStandard.jpg'
In [68]:
test3()