In [1]:
%matplotlib inline
# Hide info messages from paramiko
import logging
logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.WARN)
import time
import random
import threading
import pandas as pd
import numpy as np
import plotly.plotly as py
import plotly.graph_objs as go
import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = (2, 2)
from distributed import progress, Client
from pprint import pprint
from sklearn.decomposition import PCA
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import cross_val_score
from sklearn.datasets import load_digits
from cclyde.cluster import Cluster
In [2]:
cluster = Cluster(key_name='default_windows', n_nodes=16, cluster_name='default', instance_type='t2.medium')
In [3]:
cluster.configure()
In [4]:
cluster.launch_instances_nonblocking()
In [5]:
X, y = load_digits(return_X_y=True)
X = np.asarray([x.flatten() for x in X])
for i in range(3):
plt.imshow(X[i].reshape((8, 8)), cmap='Greys_r')
plt.title('Digit: {}'.format(y[i]))
plt.show()
In [6]:
pca = PCA(n_components=30)
print 'Features before: ', X.shape[1]
X = pca.fit_transform(X)
print 'Features after: ', X.shape[1]
print '{}% Explained Variance'.format(round(sum(pca.explained_variance_ratio_) * 100, 1))
Train with some given parameters...
In [7]:
lr = MLPClassifier(hidden_layer_sizes=(10, 5), batch_size=10,
solver='sgd', learning_rate_init=0.01, early_stopping=True)
start = time.time()
scores = cross_val_score(estimator=lr,
X=X,
y=y,
cv=5)
print("\nAccuracy: {}% (+/- {})".format(round(scores.mean() * 100, 2), round(scores.std(), 3) * 2))
print('Finished in {}sec\n'.format(round(time.time() - start, 2)))
Alright, how about something else...
In [8]:
lr = MLPClassifier(hidden_layer_sizes=(10, 10,), batch_size=100,
solver='sgd', learning_rate_init=0.01, early_stopping=True)
start = time.time()
scores = cross_val_score(estimator=lr,
X=X,
y=y,
cv=5)
print("\nAccuracy: {}% (+/- {})".format(round(scores.mean() * 100, 2), round(scores.std(), 3) * 2))
print('Finished in {}sec\n'.format(round(time.time() - start, 2)))
and now something different than that..
In [9]:
lr = MLPClassifier(hidden_layer_sizes=(10, 10, 10,), batch_size=100,
solver='sgd', learning_rate_init=0.01, early_stopping=True)
start = time.time()
scores = cross_val_score(estimator=lr,
X=X,
y=y,
cv=5)
print("\nAccuracy: {}% (+/- {})".format(round(scores.mean() * 100, 2), round(scores.std(), 3) * 2))
print('Finished in {}sec\n'.format(round(time.time() - start, 2)))
In [10]:
# Define hyper parameter ranges
batch_sizes = np.linspace(start=5, stop=750, num=50, dtype=np.int64)
n_layers = range(1, 8, 1)
# Make a list of all combinations
params = []
for batch_size in batch_sizes:
for n_layer in n_layers:
n_neuron = np.random.randint(low=5, high=200)
params.append({'batch_size': batch_size,
'hidden_layer_sizes': tuple(n_neuron for _ in range(n_layer)),
'solver': 'sgd',
'learning_rate_init': 0.01,
'early_stopping': True
})
print '{} different combinations.'.format(len(params))
pprint(params[:2])
In [11]:
print 'Lauching thread is alive: ', cluster.instance_launching_thread.is_alive()
In [12]:
cluster.install_anaconda()
In [13]:
cluster.install_python_packages(['scikit-learn', 'numpy', 'pandas', 'dask', 'futures'], method='conda')
Out[13]:
In [14]:
scheduler_address = cluster.launch_dask()
In [15]:
c = Client(address=scheduler_address)
c
Out[15]:
In [16]:
def get_data(kwargs):
"""
Function which gets data and performs PCA on it.
"""
from sklearn.datasets import load_digits
from sklearn.decomposition import PCA
import numpy as np
X, y = load_digits(return_X_y=True)
X = np.asarray([x.flatten() for x in X])
pca = PCA(n_components=30)
X = pca.fit_transform(X)
return (kwargs, X, y)
def model_tester(package):
"""
Function which is mapped to cluster. Passes kwargs to model to be trained.
Returns score based on those kwargs.
"""
kwargs, X, y = package
import time
import numpy as np
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import cross_val_score
# Initialize model with given kwargs
lr = MLPClassifier(**kwargs)
scores = cross_val_score(estimator=lr,
X=X,
y=y,
cv=5)
return (kwargs, scores.mean(), scores.std())
def score_combiner(package):
"""
Not needed, but more functions == more pretty colors
"""
import time
import random
time.sleep(random.random())
kwargs, score_m, score_std = package
kwargs.update({'score': score_m, 'std': score_std})
return kwargs
def double(n):
'''
Useless worker function # 1
'''
import time
import random
import sklearn
time.sleep(random.random())
return n * 2, 2
def add_two(package):
"""
Useless worker function # 2
"""
n, n2 = package
import time
import random
time.sleep(random.random())
return n + n2
In [17]:
futures = c.map(double, range(250))
futures = c.map(add_two, futures)
progress(futures)
In [18]:
futures = c.map(get_data, params)
futures = c.map(model_tester, futures)
futures = c.map(score_combiner, futures)
progress(futures)
In [19]:
results = c.gather(futures)
In [20]:
df = pd.DataFrame(results)
df['n_layers'] = df.hidden_layer_sizes.map(lambda _tuple: len(_tuple))
df['n_neurons'] = df.hidden_layer_sizes.map(lambda _tuple: _tuple[0])
df.head()
Out[20]:
In [21]:
df.n_layers.unique()
Out[21]:
In [21]:
data = []
for n_layers in df.n_layers.unique():
temp = df[df.n_layers == n_layers]
trace = go.Scatter(
x = temp.n_neurons,
y = temp.n_layers,
mode='markers',
text=['{}%<br>Layers: {}'.format(round(v * 100, 2), l)
for v, l in zip(temp.score.values, temp.n_layers.values)],
name='{} layers'.format(n_layers),
marker=dict(
size=temp.batch_size / 20.0,
color = temp.score, #set color equal to a variable
colorscale='Viridis',
showscale=False
)
)
data.append(trace)
layout = dict(title = 'Best performing models.<br>(size = batch size)',
xaxis = dict(zeroline = False, title='Neuron Count'),
yaxis = dict(zeroline = False, title='Layer Count'),
)
fig = dict(data=data, layout=layout)
py.iplot(fig, filename='styled-scatter')
Out[21]:
In [22]:
df.ix[df.score.argmax(), :]
Out[22]:
In [23]:
from Queue import Queue
local_q = Queue()
remote_q = c.scatter(local_q)
def long_calc1(n):
import time
import random
time.sleep(random.random())
return n + 2
def long_calc2(n):
import time
import random
time.sleep(random.random())
return n * 2
def long_calc3(n):
import time
import random
time.sleep(random.random())
return n - 2
long_calc1_q = c.map(long_calc1, remote_q)
long_calc2_q = c.map(long_calc2, long_calc1_q)
long_calc3_q = c.map(long_calc3, long_calc2_q)
result_q = c.gather(long_calc3_q)
In [24]:
result_q.qsize()
Out[24]:
In [25]:
def start_jobs():
jobs = range(500)
for job in jobs:
time.sleep(random.random())
local_q.put(job)
return
thread = threading.Thread(target=start_jobs)
thread.start()
In [26]:
def get_jobs():
while True:
print result_q.get()
return
finish_thread = threading.Thread(target=get_jobs)
finish_thread.start()
In [27]:
cluster.terminate_cluster()
In [ ]: