In [ ]:
%matplotlib inline
import string
import itertools
import datetime as dt
import calendar
from collections import OrderedDict
from sortedcollections import ItemSortedDict
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from mpl_toolkits.basemap import Basemap
In [ ]:
HEADER_INDICATOR = '66666'
DATE_FORMAT = '%Y%m%d%H'
PLOTTED_DATE_FORMAT = '%d-%m-%y %H:%M'
LOCATION_PADDING = 20.0
LINE_INTERVAL = 20.0
LATITUDE_OFFSET = 0.0
LONGITUDE_OFFSET = 0.0
PLOT_DIRECTORY = 'plots'
In [ ]:
class IndexableDict(OrderedDict):
def __init__(self, *args, **kwargs):
OrderedDict.__init__(self, *args, **kwargs)
def _type_check(self, index):
if not isinstance(index, int):
raise TypeError('indices must be integers, not {}'.format(type(index)))
if index < 0 or index <= len(self):
raise IndexError('index out of range')
def _get_key(self, index, checked=False):
if not checked:
self._type_check(index)
return next(itertools.islice(self, index, index + 1))
def _get_value(self, index, checked=False):
if not checked:
self._type_check(index)
return next(itertools.islice(self.values(), index, index + 1))
def get_key(self, index):
return _get_key(self, index)
def get_value(self, index):
return _get_value(self, index)
def get_pair(self, index):
return (self._get_key(self, index), self._get_value(self, index, checked=True))
In [ ]:
class Typhoon():
def __init__(self):
self._dict = {}
self._dict['name'] = 'N/A'
self._dict['category'] = -1
self._dict['dates'] = []
self._dict['latitudes'] = []
self._dict['longitudes'] = []
self._dict['pressures'] = []
self._dict['wind_speeds'] = []
def get_name(self):
return self._dict['name']
def set_name(self, name):
self._dict['name'] = name if name else 'N/A'
def get_category(self):
return self._dict['category']
def set_category(self, category):
self._dict['category'] = -1 if category == None else category
def get_dates(self):
return self._dict['dates']
def set_dates(self, dates):
self._dict['dates'] = dates
def get_latitudes(self):
return self._dict['latitudes']
def set_latitudes(self, latitudes):
self._dict['latitudes'] = latitudes
def get_longitudes(self):
return self._dict['longitudes']
def set_longitudes(self, longitudes):
self._dict['longitudes'] = longitudes
def get_pressures(self):
return self._dict['pressures']
def set_pressures(self, pressures):
self._dict['pressures'] = pressures
def get_wind_speeds(self):
return self._dict['wind_speeds']
def set_wind_speeds(self, wind_speeds):
self._dict['wind_speeds'] = wind_speeds
In [ ]:
class CycloneScale():
scales = {
'Saffir-Simpson': {
-1: ('N/A', '#000000'),
0: ('Tropical Depression', '#0066ff'),
1: ('Tropical Storm', '#00ccff') ,
2: ('Category One', '#ffcccc'),
3: ('Category Two', '#ff9999'),
4: ('Category Three', '#ff6666'),
5: ('Category Four', '#ff3333'),
6: ('Category Five', '#ff0000'),
'min_knots': [0, 34, 64, 83, 96, 113, 137, float('inf')]
},
'RSMC Tokyo': {
-1: ('N/A', '#000000'),
0: ('Tropical Depression', '#0066ff'),
1: ('Tropical Storm', '#00ccff') ,
2: ('Severe Tropical Storm', '#ffcccc'),
3: ('Typhoon', '#ff9999'),
4: ('Very Strong Typhoon', '#ff6666'),
5: ('Violent Typhoon', '#ff3333'),
'min_knots': [0, 34, 48, 64, 85, 105, float('inf')]
}
}
scale = 'Saffir-Simpson'
@classmethod
def get_category_num(cls, obj):
max_wind_speed = -1
if isinstance(obj, Typhoon):
if not obj.get_wind_speeds():
return -1
max_wind_speed = sum(obj.get_wind_speeds()) / len(obj.get_wind_speeds())
elif isinstance(obj, list):
if not obj:
return -1
max_wind_speed = sum(obj) / len(obj)
elif isinstance(obj, int):
max_wind_speed = obj
else:
raise TypeError('invalid type supplied; no wind speed(s) found')
if max_wind_speed < 0:
raise ValueError('the maximum wind speed cannot be negative')
min_knots = cls.scales[cls.scale]['min_knots'][:]
for n in range(len(min_knots) - 1):
if min_knots[n] <= max_wind_speed < min_knots[n + 1]:
return n
@classmethod
def get_classification(cls, category_num):
return cls.scales[cls.scale][category_num][0]
@classmethod
def get_color(cls, category_num):
return cls.scales[cls.scale][category_num][1]
@classmethod
def set_scale(cls, scale):
cls.scale = scale
In [ ]:
def format_date(date_string):
return dt.datetime.strptime(('19' if 18 <= int(date_string[:2]) <= 99 else '20') + date_string, DATE_FORMAT)
def load_typhoons(file):
with open(file) as data:
typhoons = IndexableDict()
typhoon = None
for line in data:
parsed = []
parsed.append(line[:5])
if parsed[0] == HEADER_INDICATOR:
if typhoon:
typhoon.set_category(CycloneScale.get_category_num(typhoon))
parsed.append(line[6:10])
parsed.append(line[12:15])
parsed.append(line[16:20])
parsed.append(line[21:25])
parsed.append(line[26])
parsed.append(line[28])
parsed.append(line[30:50])
parsed.append(line[64:72])
typhoon = Typhoon()
typhoons[int(parsed[1])] = typhoon
typhoon.set_name(parsed[7].strip().capitalize())
continue
parsed[0] += line[5:8]
parsed.append(line[9:12])
parsed.append(line[13])
parsed.append(line[15:18])
parsed.append(line[19:23])
parsed.append(line[24:28])
parsed.append(line[33:36])
parsed.append(line[41])
parsed.append(line[42:46])
parsed.append(line[47:51])
parsed.append(line[52])
parsed.append(line[53:57])
parsed.append(line[58:62])
parsed.append(line[71])
typhoon.get_dates().append(format_date(parsed[0]))
typhoon.get_latitudes().append(int(parsed[3].replace(' ', '0')) / 10)
typhoon.get_longitudes().append(int(parsed[4].replace(' ', '0')) / 10)
typhoon.get_pressures().append(int(parsed[5].replace(' ', '0')))
typhoon.get_wind_speeds().append(int(parsed[6].replace(' ', '0')))
return typhoons
def filter_typhoons(from_date=None, to_date=None, from_date_string=None, to_date_string=None, date_format=None, category=None):
if not from_date:
from_date = dt.datetime.min
if not to_date:
to_date = dt.datetime.max
if date_format:
if from_date_string:
from_date = dt.datetime.strptime(from_date_string, date_format)
else:
from_date_string = from_date.strftime(date_format)
if to_date_string:
to_date = dt.datetime.strptime(to_date_string, date_format)
else:
to_date_string = to_date.strftime(date_format)
filtered_typhoons = []
latitudes = np.empty(0)
longitudes = np.empty(0)
n = 0
for typhoon in typhoons.values():
n += 1
if from_date <= typhoon.get_dates()[0] <= to_date and (category == None or category == typhoon.get_category()):
filtered_typhoons.append(typhoon)
latitudes = np.concatenate((latitudes, np.array(typhoon.get_latitudes())))
longitudes = np.concatenate((longitudes, np.array(typhoon.get_longitudes())))
return (filtered_typhoons, latitudes, longitudes, from_date, to_date, from_date_string, to_date_string)
def plot_all_typhoon_paths(from_date=None, to_date=None, from_date_string=None, to_date_string=None, date_format=None, category=None, background=None, year=None, month=None, plot=True, label=True, save=False):
(filtered_typhoons, latitudes, longitudes, from_date, to_date, from_date_string, to_date_string) = filter_typhoons(from_date, to_date, from_date_string, to_date_string, date_format, category)
if not plot or not filtered_typhoons:
return len(filtered_typhoons)
latitudes = sorted(latitudes)
longitudes = sorted(longitudes)
min_latitude = latitudes[0] - LOCATION_PADDING
max_latitude = latitudes[-1] + LOCATION_PADDING
min_longitude = longitudes[0] - LOCATION_PADDING
max_longitude = longitudes[-1] + LOCATION_PADDING
fig = plt.figure(figsize=(15,9))
axes = fig.add_subplot(1, 1, 1)
mapping = Basemap(llcrnrlon=min_longitude, llcrnrlat=min_latitude, urcrnrlon=max_longitude, urcrnrlat=max_latitude, projection='cyl', lat_0=(min_latitude+max_latitude)/2.0, lon_0=(min_longitude+max_longitude)/2.0, resolution='l', area_thresh=1000.0)
for typhoon in filtered_typhoons[:-1]:
(x, y) = mapping(typhoon.get_longitudes(), typhoon.get_latitudes())
mapping.plot(x, y, CycloneScale.get_color(typhoon.get_category()), linewidth=1.5)
mapping.plot(x[0], y[0], 'g^', markersize=8)
mapping.plot(x[-1], y[-1], 'r^', markersize=8)
(x, y) = mapping(filtered_typhoons[-1].get_longitudes(), filtered_typhoons[-1].get_latitudes())
mapping.plot(x, y, CycloneScale.get_color(filtered_typhoons[-1].get_category()), linewidth=1.5)
mapping.plot(x[0], y[0], 'g^', markersize=8, label='start')
mapping.plot(x[-1], y[-1], 'r^', markersize=8, label='end')
mapping.drawcoastlines()
mapping.drawcountries()
mapping.drawparallels(np.arange(min_latitude + LATITUDE_OFFSET, max_latitude + LATITUDE_OFFSET, LINE_INTERVAL), labels=[1,1,0,0])
mapping.drawmeridians(np.arange(min_longitude + LONGITUDE_OFFSET, max_longitude + LONGITUDE_OFFSET, LINE_INTERVAL), labels=[0,0,0,1])
if background:
getattr(mapping, background)()
else:
mapping.drawmapboundary(fill_color='blanchedalmond')
mapping.fillcontinents(color='pink', lake_color='blanchedalmond')
if year and month:
if label:
axes.set_title('Paths of All Typhoons Beginning in {} {}'.format(calendar.month_abbr[month], year))
if save:
plt.savefig('{}/typhoons_{}-{}.png'.format(PLOT_DIRECTORY, year, month if 10 <= month <= 12 else '0' + str(month)), transparent=True, bbox_inches='tight')
else:
if category == None:
if label:
axes.set_title('Paths of All Typhoons Beginning From {} to {}'.format(from_date.year, to_date.year))
if save:
plt.savefig('{}/typhoons_{}_to_{}.png'.format(PLOT_DIRECTORY, from_date_string.replace(' ', '_').replace(':', ''), to_date_string.replace(' ', '_').replace(':', '')), transparent=True, bbox_inches='tight')
else:
if label:
axes.set_title('Paths of All {} Typhoons Beginning From {} to {}'.format(CycloneScale.get_classification(category), from_date.year, to_date.year))
if save:
plt.savefig('{}/typhoons_{}_{}_to_{}.png'.format(PLOT_DIRECTORY, CycloneScale.get_classification(category).lower().replace(' ', '_'), from_date_string.replace(' ', '_').replace(':', ''), to_date_string.replace(' ', '_').replace(':', '')), transparent=True, bbox_inches='tight')
plt.legend()
plt.show()
return len(filtered_typhoons)
def plot_all_typhoon_paths_per_month(year, **kwargs):
return [plot_all_typhoon_paths(from_date=dt.datetime(year, month, 1), to_date=dt.datetime(year, month, calendar.monthrange(year, month)[1], 23, 59, 59, 999999), year=year, month=month, **kwargs) for month in range(1, 13)]
def plot_all_typhoon_paths_per_year(**kwargs):
return plot_all_typhoon_paths(from_date=dt.datetime(1951, 1, 1), to_date=dt.datetime(2018, 1, 1)-dt.timedelta(microseconds=1), date_format=PLOTTED_DATE_FORMAT)
In [ ]:
typhoons = load_typhoons('bst_all.txt')
In [ ]:
save = False
frequencies = {}
# Load storm frequencies
for year in range(1951, 2017):
frequencies[year] = plot_all_typhoon_paths_per_month(year, plot=False)
# Plot each separate category of typhoons from 1951-2016
for n in range(-1, 7):
print(plot_all_typhoon_paths(from_date=dt.datetime(1951, 1, 1), to_date=dt.datetime(2017, 1, 1)-dt.timedelta(microseconds=1), date_format=PLOTTED_DATE_FORMAT, category=n, label=False, save=save))
# Plot all categories of typhoons from 1951-2016
print(plot_all_typhoon_paths(from_date=dt.datetime(1951, 1, 1), to_date=dt.datetime(2017, 1, 1)-dt.timedelta(microseconds=1), date_format=PLOTTED_DATE_FORMAT, label=False, save=save))
# Plot all categories of typhoons in 2016
print(plot_all_typhoon_paths(from_date=dt.datetime(2016, 1, 1), to_date=dt.datetime(2017, 1, 1)-dt.timedelta(microseconds=1), date_format=PLOTTED_DATE_FORMAT, label=False, save=save))
# Plot each separate month of all categories of typhoons from 1951-2016
print(plot_all_typhoon_paths_per_month(2016, date_format=PLOTTED_DATE_FORMAT, label=False, save=save))
year = 2016
# Plot the frequency of typhoons per month in 2016
plt.plot(np.arange(12), frequencies[year])
plt.xticks(np.arange(12), calendar.month_abbr[1:13])
# plt.title('Frequency of Typhoons in {}'.format(year), fontsize=30)
plt.xlabel('Month', fontsize=20)
plt.ylabel('Number of Typhoons', fontsize=20)
if save:
plt.savefig('plots/storm_freq_{}.png'.format(year), transparent=True, bbox_inches='tight')
plt.show()
# Plot the frequency of typhoons per month from 1951-2016
for year in np.arange(1951, 2017):
plt.plot(np.arange(12), frequencies[year])
plt.xticks(np.arange(12), calendar.month_abbr[1:13])
# plt.title('Frequency of Typhoons in {}'.format(year), fontsize=30)
plt.xlabel('Month', fontsize=20)
plt.ylabel('Number of Typhoons', fontsize=20)
if save:
plt.savefig('plots/storm_freq_all_{}.png'.format(year), transparent=True, bbox_inches='tight')
plt.show()
# Plot the frequency of typhoons per year from 1951-2016
plt.plot(np.arange(1951, 2017), [sum(frequencies[year]) for year in np.arange(1951, 2017)])
plt.plot(np.arange(1951, 2017), np.poly1d(np.polyfit(np.arange(1951, 2017), [sum(frequencies[year]) for year in np.arange(1951, 2017)], 2))(np.arange(1951, 2017)))
# plt.title('Frequency of Typhoons From 1951 to 2016', fontsize=30)
plt.xlabel('Year', fontsize=20)
plt.ylabel('Number of Typhoons', fontsize=20)
if save:
plt.savefig('plots/storm_freq_1951-2016_years.png'.format(year), transparent=True, bbox_inches='tight')
plt.show()