In [ ]:
%matplotlib inline

import string
import itertools
import datetime as dt
import calendar
from collections import OrderedDict
from sortedcollections import ItemSortedDict

import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from mpl_toolkits.basemap import Basemap

In [ ]:
HEADER_INDICATOR = '66666'
DATE_FORMAT = '%Y%m%d%H'
PLOTTED_DATE_FORMAT = '%d-%m-%y %H:%M'
LOCATION_PADDING = 20.0
LINE_INTERVAL = 20.0
LATITUDE_OFFSET = 0.0
LONGITUDE_OFFSET = 0.0
PLOT_DIRECTORY = 'plots'

In [ ]:
class IndexableDict(OrderedDict):
    def __init__(self, *args, **kwargs):
        OrderedDict.__init__(self, *args, **kwargs)
    
    def _type_check(self, index):
        if not isinstance(index, int):
            raise TypeError('indices must be integers, not {}'.format(type(index)))
            
        if index < 0 or index <= len(self):
            raise IndexError('index out of range')
    
    def _get_key(self, index, checked=False):
        if not checked:
            self._type_check(index)
        
        return next(itertools.islice(self, index, index + 1))
    
    def _get_value(self, index, checked=False):
        if not checked:
            self._type_check(index)
        
        return next(itertools.islice(self.values(), index, index + 1))
    
    def get_key(self, index):
        return _get_key(self, index)
    
    def get_value(self, index):
        return _get_value(self, index)
    
    def get_pair(self, index):
        return (self._get_key(self, index), self._get_value(self, index, checked=True))

In [ ]:
class Typhoon():
    def __init__(self):
        self._dict = {}
        self._dict['name'] = 'N/A'
        self._dict['category'] = -1
        self._dict['dates'] = []
        self._dict['latitudes'] = []
        self._dict['longitudes'] = []
        self._dict['pressures'] = []
        self._dict['wind_speeds'] = []
    
    def get_name(self):
        return self._dict['name']
    
    def set_name(self, name):
        self._dict['name'] = name if name else 'N/A'
    
    def get_category(self):
        return self._dict['category']
    
    def set_category(self, category):
        self._dict['category'] = -1 if category == None else category
    
    def get_dates(self):
        return self._dict['dates']
    
    def set_dates(self, dates):
        self._dict['dates'] = dates
    
    def get_latitudes(self):
        return self._dict['latitudes']
    
    def set_latitudes(self, latitudes):
        self._dict['latitudes'] = latitudes
    
    def get_longitudes(self):
        return self._dict['longitudes']
    
    def set_longitudes(self, longitudes):
        self._dict['longitudes'] = longitudes
    
    def get_pressures(self):
        return self._dict['pressures']
    
    def set_pressures(self, pressures):
        self._dict['pressures'] = pressures
    
    def get_wind_speeds(self):
        return self._dict['wind_speeds']
    
    def set_wind_speeds(self, wind_speeds):
        self._dict['wind_speeds'] = wind_speeds

In [ ]:
class CycloneScale():
    scales = {
        'Saffir-Simpson': {
            -1: ('N/A', '#000000'),
            0: ('Tropical Depression', '#0066ff'),
            1: ('Tropical Storm', '#00ccff') ,
            2: ('Category One', '#ffcccc'),
            3: ('Category Two', '#ff9999'),
            4: ('Category Three', '#ff6666'),
            5: ('Category Four', '#ff3333'),
            6: ('Category Five', '#ff0000'),
            'min_knots': [0, 34, 64, 83, 96, 113, 137, float('inf')]
        },
        
        'RSMC Tokyo': {
            -1: ('N/A', '#000000'),
            0: ('Tropical Depression', '#0066ff'),
            1: ('Tropical Storm', '#00ccff') ,
            2: ('Severe Tropical Storm', '#ffcccc'),
            3: ('Typhoon', '#ff9999'),
            4: ('Very Strong Typhoon', '#ff6666'),
            5: ('Violent Typhoon', '#ff3333'),
            'min_knots': [0, 34, 48, 64, 85, 105, float('inf')]
        }
    }
    
    scale = 'Saffir-Simpson'
    
    @classmethod
    def get_category_num(cls, obj):
        max_wind_speed = -1
        
        if isinstance(obj, Typhoon):
            if not obj.get_wind_speeds():
                return -1
            
            max_wind_speed = sum(obj.get_wind_speeds()) / len(obj.get_wind_speeds())
        
        elif isinstance(obj, list):
            if not obj:
                return -1
            
            max_wind_speed = sum(obj) / len(obj)
        
        elif isinstance(obj, int):
            max_wind_speed = obj
        
        else:
            raise TypeError('invalid type supplied; no wind speed(s) found')
        
        if max_wind_speed < 0:
            raise ValueError('the maximum wind speed cannot be negative')
    
        min_knots = cls.scales[cls.scale]['min_knots'][:]
        
        for n in range(len(min_knots) - 1):
            if min_knots[n] <= max_wind_speed < min_knots[n + 1]:
                return n
    
    @classmethod
    def get_classification(cls, category_num):
        return cls.scales[cls.scale][category_num][0]
    
    @classmethod
    def get_color(cls, category_num):
        return cls.scales[cls.scale][category_num][1]
    
    @classmethod
    def set_scale(cls, scale):
        cls.scale = scale

In [ ]:
def format_date(date_string):
    return dt.datetime.strptime(('19' if 18 <= int(date_string[:2]) <= 99 else '20') + date_string, DATE_FORMAT)

def load_typhoons(file):
    with open(file) as data:
        typhoons = IndexableDict()
        typhoon = None
        
        for line in data:
            parsed = []
            parsed.append(line[:5])
            
            if parsed[0] == HEADER_INDICATOR:
                if typhoon:
                    typhoon.set_category(CycloneScale.get_category_num(typhoon))
                
                parsed.append(line[6:10])
                parsed.append(line[12:15])
                parsed.append(line[16:20])
                parsed.append(line[21:25])
                parsed.append(line[26])
                parsed.append(line[28])
                parsed.append(line[30:50])
                parsed.append(line[64:72])
                
                typhoon = Typhoon()
                typhoons[int(parsed[1])] = typhoon
                typhoon.set_name(parsed[7].strip().capitalize())
                
                continue
            
            parsed[0] += line[5:8]
            parsed.append(line[9:12])
            parsed.append(line[13])
            parsed.append(line[15:18])
            parsed.append(line[19:23])
            parsed.append(line[24:28])
            parsed.append(line[33:36])
            parsed.append(line[41])
            parsed.append(line[42:46])
            parsed.append(line[47:51])
            parsed.append(line[52])
            parsed.append(line[53:57])
            parsed.append(line[58:62])
            parsed.append(line[71])
            
            typhoon.get_dates().append(format_date(parsed[0]))
            typhoon.get_latitudes().append(int(parsed[3].replace(' ', '0')) / 10)
            typhoon.get_longitudes().append(int(parsed[4].replace(' ', '0')) / 10)
            typhoon.get_pressures().append(int(parsed[5].replace(' ', '0')))
            typhoon.get_wind_speeds().append(int(parsed[6].replace(' ', '0')))
        
        return typhoons

def filter_typhoons(from_date=None, to_date=None, from_date_string=None, to_date_string=None, date_format=None, category=None):
    if not from_date:
        from_date = dt.datetime.min
    
    if not to_date:
        to_date = dt.datetime.max
        
    if date_format:
        if from_date_string:
            from_date = dt.datetime.strptime(from_date_string, date_format)
        
        else:
            from_date_string = from_date.strftime(date_format)
        
        if to_date_string:
            to_date = dt.datetime.strptime(to_date_string, date_format)
        
        else:
            to_date_string = to_date.strftime(date_format)
    
    filtered_typhoons = []
    latitudes = np.empty(0)
    longitudes = np.empty(0)

    n = 0
    for typhoon in typhoons.values():
        n += 1
        if from_date <= typhoon.get_dates()[0] <= to_date and (category == None or category == typhoon.get_category()):
            filtered_typhoons.append(typhoon)
            latitudes = np.concatenate((latitudes, np.array(typhoon.get_latitudes())))
            longitudes = np.concatenate((longitudes, np.array(typhoon.get_longitudes())))
    
    return (filtered_typhoons, latitudes, longitudes, from_date, to_date, from_date_string, to_date_string)

def plot_all_typhoon_paths(from_date=None, to_date=None, from_date_string=None, to_date_string=None, date_format=None, category=None, background=None, year=None, month=None, plot=True, label=True, save=False):
    (filtered_typhoons, latitudes, longitudes, from_date, to_date, from_date_string, to_date_string) = filter_typhoons(from_date, to_date, from_date_string, to_date_string, date_format, category)
    if not plot or not filtered_typhoons:
        return len(filtered_typhoons)
    
    latitudes = sorted(latitudes)
    longitudes = sorted(longitudes)
    min_latitude = latitudes[0] - LOCATION_PADDING
    max_latitude = latitudes[-1] + LOCATION_PADDING
    min_longitude = longitudes[0] - LOCATION_PADDING
    max_longitude = longitudes[-1] + LOCATION_PADDING

    fig = plt.figure(figsize=(15,9))
    axes = fig.add_subplot(1, 1, 1)
    mapping = Basemap(llcrnrlon=min_longitude, llcrnrlat=min_latitude, urcrnrlon=max_longitude, urcrnrlat=max_latitude, projection='cyl', lat_0=(min_latitude+max_latitude)/2.0, lon_0=(min_longitude+max_longitude)/2.0, resolution='l', area_thresh=1000.0)

    for typhoon in filtered_typhoons[:-1]:
        (x, y) = mapping(typhoon.get_longitudes(), typhoon.get_latitudes())
        mapping.plot(x, y, CycloneScale.get_color(typhoon.get_category()), linewidth=1.5)
        mapping.plot(x[0], y[0], 'g^', markersize=8)
        mapping.plot(x[-1], y[-1], 'r^', markersize=8)

    (x, y) = mapping(filtered_typhoons[-1].get_longitudes(), filtered_typhoons[-1].get_latitudes())
    mapping.plot(x, y, CycloneScale.get_color(filtered_typhoons[-1].get_category()), linewidth=1.5)
    mapping.plot(x[0], y[0], 'g^', markersize=8, label='start')
    mapping.plot(x[-1], y[-1], 'r^', markersize=8, label='end')

    mapping.drawcoastlines()
    mapping.drawcountries()
    mapping.drawparallels(np.arange(min_latitude + LATITUDE_OFFSET, max_latitude + LATITUDE_OFFSET, LINE_INTERVAL), labels=[1,1,0,0])
    mapping.drawmeridians(np.arange(min_longitude + LONGITUDE_OFFSET, max_longitude + LONGITUDE_OFFSET, LINE_INTERVAL), labels=[0,0,0,1])

    if background:
        getattr(mapping, background)()

    else:
        mapping.drawmapboundary(fill_color='blanchedalmond')
        mapping.fillcontinents(color='pink', lake_color='blanchedalmond')

    if year and month:
        if label:
            axes.set_title('Paths of All Typhoons Beginning in {} {}'.format(calendar.month_abbr[month], year))
        
        if save:
            plt.savefig('{}/typhoons_{}-{}.png'.format(PLOT_DIRECTORY, year, month if 10 <= month <= 12 else '0' + str(month)), transparent=True, bbox_inches='tight')

    else:
        if category == None:
            if label:
                axes.set_title('Paths of All Typhoons Beginning From {} to {}'.format(from_date.year, to_date.year))

            if save:
                plt.savefig('{}/typhoons_{}_to_{}.png'.format(PLOT_DIRECTORY, from_date_string.replace(' ', '_').replace(':', ''), to_date_string.replace(' ', '_').replace(':', '')), transparent=True, bbox_inches='tight')
        
        else:
            if label:
                axes.set_title('Paths of All {} Typhoons Beginning From {} to {}'.format(CycloneScale.get_classification(category), from_date.year, to_date.year))

            if save:
                plt.savefig('{}/typhoons_{}_{}_to_{}.png'.format(PLOT_DIRECTORY, CycloneScale.get_classification(category).lower().replace(' ', '_'), from_date_string.replace(' ', '_').replace(':', ''), to_date_string.replace(' ', '_').replace(':', '')), transparent=True, bbox_inches='tight')
        

    plt.legend()
    plt.show()
        
    return len(filtered_typhoons)

def plot_all_typhoon_paths_per_month(year, **kwargs):
    return [plot_all_typhoon_paths(from_date=dt.datetime(year, month, 1), to_date=dt.datetime(year, month, calendar.monthrange(year, month)[1], 23, 59, 59, 999999), year=year, month=month, **kwargs) for month in range(1, 13)]

def plot_all_typhoon_paths_per_year(**kwargs):
    return plot_all_typhoon_paths(from_date=dt.datetime(1951, 1, 1), to_date=dt.datetime(2018, 1, 1)-dt.timedelta(microseconds=1), date_format=PLOTTED_DATE_FORMAT)

In [ ]:
typhoons = load_typhoons('bst_all.txt')

In [ ]:
save = False
frequencies = {}

# Load storm frequencies
for year in range(1951, 2017):
    frequencies[year] = plot_all_typhoon_paths_per_month(year, plot=False)

# Plot each separate category of typhoons from 1951-2016
for n in range(-1, 7):
    print(plot_all_typhoon_paths(from_date=dt.datetime(1951, 1, 1), to_date=dt.datetime(2017, 1, 1)-dt.timedelta(microseconds=1), date_format=PLOTTED_DATE_FORMAT, category=n, label=False, save=save))

# Plot all categories of typhoons from 1951-2016
print(plot_all_typhoon_paths(from_date=dt.datetime(1951, 1, 1), to_date=dt.datetime(2017, 1, 1)-dt.timedelta(microseconds=1), date_format=PLOTTED_DATE_FORMAT, label=False, save=save))

# Plot all categories of typhoons in 2016
print(plot_all_typhoon_paths(from_date=dt.datetime(2016, 1, 1), to_date=dt.datetime(2017, 1, 1)-dt.timedelta(microseconds=1), date_format=PLOTTED_DATE_FORMAT, label=False, save=save))

# Plot each separate month of all categories of typhoons from 1951-2016
print(plot_all_typhoon_paths_per_month(2016, date_format=PLOTTED_DATE_FORMAT, label=False, save=save))

year = 2016

# Plot the frequency of typhoons per month in 2016
plt.plot(np.arange(12), frequencies[year])
plt.xticks(np.arange(12), calendar.month_abbr[1:13])
# plt.title('Frequency of Typhoons in {}'.format(year), fontsize=30)
plt.xlabel('Month', fontsize=20)
plt.ylabel('Number of Typhoons', fontsize=20)
if save:
    plt.savefig('plots/storm_freq_{}.png'.format(year), transparent=True, bbox_inches='tight')

plt.show()

# Plot the frequency of typhoons per month from 1951-2016
for year in np.arange(1951, 2017):
    plt.plot(np.arange(12), frequencies[year])

plt.xticks(np.arange(12), calendar.month_abbr[1:13])
# plt.title('Frequency of Typhoons in {}'.format(year), fontsize=30)
plt.xlabel('Month', fontsize=20)
plt.ylabel('Number of Typhoons', fontsize=20)
if save:
    plt.savefig('plots/storm_freq_all_{}.png'.format(year), transparent=True, bbox_inches='tight')

plt.show()

# Plot the frequency of typhoons per year from 1951-2016
plt.plot(np.arange(1951, 2017), [sum(frequencies[year]) for year in np.arange(1951, 2017)])
plt.plot(np.arange(1951, 2017), np.poly1d(np.polyfit(np.arange(1951, 2017), [sum(frequencies[year]) for year in np.arange(1951, 2017)], 2))(np.arange(1951, 2017)))
# plt.title('Frequency of Typhoons From 1951 to 2016', fontsize=30)
plt.xlabel('Year', fontsize=20)
plt.ylabel('Number of Typhoons', fontsize=20)
if save:
    plt.savefig('plots/storm_freq_1951-2016_years.png'.format(year), transparent=True, bbox_inches='tight')

plt.show()